Major refactor. eland is now backed by modin.

First push, still not functional.
This commit is contained in:
Stephen Dodson 2019-07-04 13:00:19 +00:00
parent 5e10b2e818
commit 15e0c37182
24 changed files with 666 additions and 7854 deletions

View File

@ -1,7 +1,15 @@
import os
# Set modin to pandas to avoid starting ray or other
os.environ["MODIN_ENGINE"] = 'python'
os.environ["MODIN_BACKEND"] = 'pandas'
from .client import *
from .ndframe import *
from .index import *
from .mappings import *
from .operations import *
from .query_compiler import *
from .ndframe import *
from .series import *
from .dataframe import *
from .utils import *

View File

@ -1,37 +1,34 @@
from elasticsearch import Elasticsearch
from elasticsearch import helpers
class Client():
class Client:
"""
eland client - implemented as facade to control access to Elasticsearch methods
"""
def __init__(self, es=None):
if isinstance(es, Elasticsearch):
self.es = es
self._es = es
elif isinstance(es, Client):
self.es = es.es
self._es = es._es
else:
self.es = Elasticsearch(es)
self._es = Elasticsearch(es)
def info(self):
return self.es.info()
def indices(self):
return self.es.indices
def get_mapping(self, **kwargs):
return self._es.indices.get_mapping(**kwargs)
def bulk(self, actions, refresh=False):
return helpers.bulk(self.es, actions, refresh=refresh)
return helpers.bulk(self._es, actions, refresh=refresh)
def scan(self, **kwargs):
return helpers.scan(self.es, **kwargs)
return helpers.scan(self._es, **kwargs)
def search(self, **kwargs):
return self.es.search(**kwargs)
return self._es.search(**kwargs)
def field_caps(self, **kwargs):
return self.es.field_caps(**kwargs)
return self._es.field_caps(**kwargs)
def count(self, **kwargs):
count_json = self.es.count(**kwargs)
count_json = self._es.count(**kwargs)
return count_json['count']

View File

@ -1,394 +1,58 @@
"""
DataFrame
---------
An efficient 2D container for potentially mixed-type time series or other
labeled data series.
The underlying data resides in Elasticsearch and the API aligns as much as
possible with pandas.DataFrame API.
This allows the eland.DataFrame to access large datasets stored in Elasticsearch,
without storing the dataset in local memory.
Implementation Details
----------------------
Elasticsearch indexes can be configured in many different ways, and these indexes
utilise different data structures to pandas.DataFrame.
eland.DataFrame operations that return individual rows (e.g. df.head()) return
_source data. If _source is not enabled, this data is not accessible.
Similarly, only Elasticsearch searchable fields can be searched or filtered, and
only Elasticsearch aggregatable fields can be aggregated or grouped.
"""
import sys
from eland import NDFrame
import pandas as pd
from pandas.io.formats import format as fmt
from pandas.io.formats.printing import pprint_thing
from pandas.compat import StringIO
from pandas.io.common import _expand_user, _stringify_path
from pandas.io.formats import console
from pandas.core import common as com
from eland import NDFrame
from eland import Index
from eland import Series
class DataFrame(NDFrame):
"""
pandas.DataFrame like API that proxies into Elasticsearch index(es).
Parameters
----------
client : eland.Client
A reference to a Elasticsearch python client
index_pattern : str
An Elasticsearch index pattern. This can contain wildcards (e.g. filebeat-*).
See Also
--------
Examples
--------
import eland as ed
client = ed.Client(Elasticsearch())
df = ed.DataFrame(client, 'reviews')
df.head()
reviewerId vendorId rating date
0 0 0 5 2006-04-07 17:08
1 1 1 5 2006-05-04 12:16
2 2 2 4 2006-04-21 12:26
3 3 3 5 2006-04-18 15:48
4 3 4 5 2006-04-18 15:49
Notice that the types are based on Elasticsearch mappings
Notes
-----
If the Elasticsearch index is deleted or index mappings are changed after this
object is created, the object is not rebuilt and so inconsistencies can occur.
"""
# TODO create effectively 2 constructors
# 1. client, index_pattern, columns, index_field
# 2. query_compiler
def __init__(self,
client,
index_pattern,
mappings=None,
index_field=None):
client=None,
index_pattern=None,
columns=None,
index_field=None,
query_compiler=None):
# python 3 syntax
super().__init__(client, index_pattern, mappings=mappings, index_field=index_field)
super().__init__(
client=client,
index_pattern=index_pattern,
columns=columns,
index_field=index_field,
query_compiler=query_compiler)
def head(self, n=5):
return super()._head(n)
def _get_columns(self):
return self._query_compiler.columns
def tail(self, n=5):
return super()._tail(n)
def info(self, verbose=None, buf=None, max_cols=None, memory_usage=None,
null_counts=None):
"""
Print a concise summary of a DataFrame.
This method prints information about a DataFrame including
the index dtype and column dtypes, non-null values and memory usage.
This copies a lot of code from pandas.DataFrame.info as it is difficult
to split out the appropriate code or creating a SparseDataFrame gives
incorrect results on types and counts.
"""
if buf is None: # pragma: no cover
buf = sys.stdout
lines = []
lines.append(str(type(self)))
lines.append(self._index_summary())
if len(self.columns) == 0:
lines.append('Empty {name}'.format(name=type(self).__name__))
fmt.buffer_put_lines(buf, lines)
return
cols = self.columns
# hack
if max_cols is None:
max_cols = pd.get_option('display.max_info_columns',
len(self.columns) + 1)
max_rows = pd.get_option('display.max_info_rows', len(self) + 1)
if null_counts is None:
show_counts = ((len(self.columns) <= max_cols) and
(len(self) < max_rows))
else:
show_counts = null_counts
exceeds_info_cols = len(self.columns) > max_cols
def _verbose_repr():
lines.append('Data columns (total %d columns):' %
len(self.columns))
space = max(len(pprint_thing(k)) for k in self.columns) + 4
counts = None
tmpl = "{count}{dtype}"
if show_counts:
counts = self.count()
if len(cols) != len(counts): # pragma: no cover
raise AssertionError(
'Columns must equal counts '
'({cols:d} != {counts:d})'.format(
cols=len(cols), counts=len(counts)))
tmpl = "{count} non-null {dtype}"
dtypes = self.dtypes
for i, col in enumerate(self.columns):
dtype = dtypes.iloc[i]
col = pprint_thing(col)
count = ""
if show_counts:
count = counts.iloc[i]
lines.append(_put_str(col, space) + tmpl.format(count=count,
dtype=dtype))
def _non_verbose_repr():
lines.append(self.columns._summary(name='Columns'))
def _sizeof_fmt(num, size_qualifier):
# returns size in human readable format
for x in ['bytes', 'KB', 'MB', 'GB', 'TB']:
if num < 1024.0:
return ("{num:3.1f}{size_q} "
"{x}".format(num=num, size_q=size_qualifier, x=x))
num /= 1024.0
return "{num:3.1f}{size_q} {pb}".format(num=num,
size_q=size_qualifier,
pb='PB')
if verbose:
_verbose_repr()
elif verbose is False: # specifically set to False, not nesc None
_non_verbose_repr()
else:
if exceeds_info_cols:
_non_verbose_repr()
else:
_verbose_repr()
counts = self.get_dtype_counts()
dtypes = ['{k}({kk:d})'.format(k=k[0], kk=k[1]) for k
in sorted(counts.items())]
lines.append('dtypes: {types}'.format(types=', '.join(dtypes)))
if memory_usage is None:
memory_usage = pd.get_option('display.memory_usage')
if memory_usage:
# append memory usage of df to display
size_qualifier = ''
# TODO - this is different from pd.DataFrame as we shouldn't
# really hold much in memory. For now just approximate with getsizeof + ignore deep
mem_usage = sys.getsizeof(self)
lines.append("memory usage: {mem}\n".format(
mem=_sizeof_fmt(mem_usage, size_qualifier)))
fmt.buffer_put_lines(buf, lines)
columns = property(_get_columns)
@property
def shape(self):
def empty(self):
"""Determines if the DataFrame is empty.
Returns:
True if the DataFrame is empty.
False otherwise.
"""
Return a tuple representing the dimensionality of the DataFrame.
# TODO - this is called on every attribute get (most methods) from modin/pandas/base.py:3337
# (as Index.__len__ performs an query) we may want to cache self.index.empty()
return len(self.columns) == 0 or len(self.index) == 0
Returns
-------
shape: tuple
0 - number of rows
1 - number of columns
"""
num_rows = len(self)
num_columns = len(self.columns)
def head(self, n=5):
return super().head(n)
return num_rows, num_columns
def tail(self, n=5):
return super().tail(n)
def set_index(self, index_field):
copy = self.copy()
copy._index = Index(index_field)
return copy
def _index_summary(self):
head = self.head(1).index[0]
tail = self.tail(1).index[0]
index_summary = ', %s to %s' % (pprint_thing(head),
pprint_thing(tail))
name = "Index"
return '%s: %s entries%s' % (name, len(self), index_summary)
def count(self):
"""
Count non-NA cells for each column (TODO row)
Counts are based on exists queries against ES
This is inefficient, as it creates N queries (N is number of fields).
An alternative approach is to use value_count aggregations. However, they have issues in that:
1. They can only be used with aggregatable fields (e.g. keyword not text)
2. For list fields they return multiple counts. E.g. tags=['elastic', 'ml'] returns value_count=2
for a single document.
"""
counts = {}
for field in self._mappings.source_fields():
exists_query = {"query": {"exists": {"field": field}}}
field_exists_count = self._client.count(index=self._index_pattern, body=exists_query)
counts[field] = field_exists_count
count = pd.Series(data=counts, index=self._mappings.source_fields())
return count
def describe(self):
return super()._describe()
def __getitem__(self, key):
# NOTE: there is a difference between pandas here.
# e.g. df['a'] returns pd.Series, df[['a','b']] return pd.DataFrame
# Implementation mainly copied from pandas v0.24.2
# (https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html)
key = com.apply_if_callable(key, self)
# TODO - add slice capabilities - need to add index features first
# e.g. set index etc.
# Do we have a slicer (on rows)?
"""
indexer = convert_to_index_sliceable(self, key)
if indexer is not None:
return self._slice(indexer, axis=0)
# Do we have a (boolean) DataFrame?
if isinstance(key, DataFrame):
return self._getitem_frame(key)
"""
# Do we have a (boolean) 1d indexer?
"""
if com.is_bool_indexer(key):
return self._getitem_bool_array(key)
"""
# We are left with two options: a single key, and a collection of keys,
columns = []
is_single_key = False
if isinstance(key, str):
if not self._mappings.is_source_field(key):
raise TypeError('Column does not exist: [{0}]'.format(key))
columns.append(key)
is_single_key = True
elif isinstance(key, list):
columns.extend(key)
else:
raise TypeError('__getitem__ arguments invalid: [{0}]'.format(key))
mappings = self._filter_mappings(columns)
# Return new eland.DataFrame with modified mappings
if is_single_key:
return Series(self._client, self._index_pattern, mappings=mappings)
else:
return DataFrame(self._client, self._index_pattern, mappings=mappings)
def __getattr__(self, name):
# Note: obj.x will always call obj.__getattribute__('x') prior to
# calling obj.__getattr__('x').
mappings = self._filter_mappings([name])
return Series(self._client, self._index_pattern, mappings=mappings)
def copy(self):
# TODO - test and validate...may need deep copying
return DataFrame(self._client,
self._index_pattern,
self._mappings,
self._index)
# ----------------------------------------------------------------------
# Rendering Methods
def __repr__(self):
"""
From pandas
"""
buf = StringIO()
num_rows = pd.get_option("max_rows") or 60
num_cols = pd.get_option("max_columns") or 20
max_rows = pd.get_option("display.max_rows")
max_cols = pd.get_option("display.max_columns")
show_dimensions = pd.get_option("display.show_dimensions")
if pd.get_option("display.expand_frame_repr"):
width, _ = console.get_console_size()
result = repr(self._build_repr_df(num_rows, num_cols))
if len(self.index) > num_rows or len(self.columns) > num_cols:
# The split here is so that we don't repr pandas row lengths.
return result.rsplit("\n\n", 1)[0] + "\n\n[{0} rows x {1} columns]".format(
len(self.index), len(self.columns)
)
else:
width = None
self.to_string(buf=buf, max_rows=max_rows, max_cols=max_cols,
line_width=width, show_dimensions=show_dimensions)
return buf.getvalue()
def to_string(self, buf=None, columns=None, col_space=None, header=True,
index=True, na_rep='NaN', formatters=None, float_format=None,
sparsify=None, index_names=True, justify=None,
max_rows=None, max_cols=None, show_dimensions=True,
decimal='.', line_width=None):
"""
From pandas
"""
if max_rows == None:
max_rows = pd.get_option('display.max_rows')
df = self._fake_head_tail_df(max_rows=max_rows+1)
if buf is not None:
_buf = _expand_user(_stringify_path(buf))
else:
_buf = StringIO()
df.to_string(buf=_buf, columns=columns,
col_space=col_space, na_rep=na_rep,
formatters=formatters,
float_format=float_format,
sparsify=sparsify, justify=justify,
index_names=index_names,
header=header, index=index,
max_rows=max_rows,
max_cols=max_cols,
show_dimensions=False, # print this outside of this call
decimal=decimal,
line_width=line_width)
# Our fake dataframe has incorrect number of rows (max_rows*2+1) - write out
# the correct number of rows
if show_dimensions:
_buf.write("\n\n[{nrows} rows x {ncols} columns]"
.format(nrows=self._index_count(), ncols=len(self.columns)))
if buf is None:
result = _buf.getvalue()
return result
def to_pandas(selfs):
return super()._to_pandas()
# From pandas.DataFrame
def _put_str(s, space):
return '{s}'.format(s=s)[:space].ljust(space)

View File

@ -18,10 +18,12 @@ class Index:
ID_INDEX_FIELD = '_id'
ID_SORT_FIELD = '_doc' # if index field is _id, sort by _doc
def __init__(self, index_field=None):
def __init__(self, query_compiler, index_field=None):
# Calls setter
self.index_field = index_field
self._query_compiler = query_compiler
@property
def sort_field(self):
if self._index_field == self.ID_INDEX_FIELD:
@ -38,9 +40,12 @@ class Index:
@index_field.setter
def index_field(self, index_field):
if index_field == None:
if index_field == None or index_field == Index.ID_INDEX_FIELD:
self._index_field = Index.ID_INDEX_FIELD
self._is_source_field = False
else:
self._index_field = index_field
self._is_source_field = True
def __len__(self):
return self._query_compiler._index_count()

View File

@ -4,7 +4,7 @@ import pandas as pd
from pandas.core.dtypes.common import (is_float_dtype, is_bool_dtype, is_integer_dtype, is_datetime_or_timedelta_dtype, is_string_dtype)
class Mappings():
class Mappings:
"""
General purpose to manage Elasticsearch to/from pandas mappings
@ -53,7 +53,7 @@ class Mappings():
Columns to copy
"""
if (client is not None) and (index_pattern is not None):
get_mapping = client.indices().get_mapping(index=index_pattern)
get_mapping = client.get_mapping(index=index_pattern)
# Get all fields (including all nested) and then field_caps
# for these names (fields=* doesn't appear to work effectively...)
@ -67,12 +67,8 @@ class Mappings():
# field_name, es_dtype, pd_dtype, is_searchable, is_aggregtable, is_source
self._mappings_capabilities = Mappings._create_capability_matrix(all_fields, source_fields, all_fields_caps)
else:
if columns is not None:
# Reference object and restrict mapping columns
self._mappings_capabilities = mappings._mappings_capabilities.loc[columns]
else:
# straight copy
self._mappings_capabilities = mappings._mappings_capabilities.copy()
# straight copy
self._mappings_capabilities = mappings._mappings_capabilities.copy()
# Cache source field types for efficient lookup
# (this massively improves performance of DataFrame.flatten)

View File

@ -22,350 +22,55 @@ Similarly, only Elasticsearch searchable fields can be searched or filtered, and
only Elasticsearch aggregatable fields can be aggregated or grouped.
"""
import pandas as pd
import functools
from elasticsearch_dsl import Search
import eland as ed
from modin.pandas.base import BasePandasDataset
from pandas.core.generic import NDFrame as pd_NDFrame
from pandas._libs import Timestamp, iNaT, properties
from eland import ElandQueryCompiler
class NDFrame():
"""
pandas.DataFrame/Series like API that proxies into Elasticsearch index(es).
class NDFrame(BasePandasDataset):
Parameters
----------
client : eland.Client
A reference to a Elasticsearch python client
index_pattern : str
An Elasticsearch index pattern. This can contain wildcards (e.g. filebeat-*).
See Also
--------
"""
def __init__(self,
client,
index_pattern,
mappings=None,
index_field=None):
self._client = ed.Client(client)
self._index_pattern = index_pattern
# Get and persist mappings, this allows us to correctly
# map returned types from Elasticsearch to pandas datatypes
if mappings is None:
self._mappings = ed.Mappings(self._client, self._index_pattern)
else:
self._mappings = mappings
self._index = ed.Index(index_field)
def _es_results_to_pandas(self, results):
client=None,
index_pattern=None,
columns=None,
index_field=None,
query_compiler=None):
"""
pandas.DataFrame/Series like API that proxies into Elasticsearch index(es).
Parameters
----------
results: dict
Elasticsearch results from self.client.search
Returns
-------
df: pandas.DataFrame
_source values extracted from results and mapped to pandas DataFrame
dtypes are mapped via Mapping object
Notes
-----
Fields containing lists in Elasticsearch don't map easily to pandas.DataFrame
For example, an index with mapping:
```
"mappings" : {
"properties" : {
"group" : {
"type" : "keyword"
},
"user" : {
"type" : "nested",
"properties" : {
"first" : {
"type" : "keyword"
},
"last" : {
"type" : "keyword"
}
}
}
}
}
```
Adding a document:
```
"_source" : {
"group" : "amsterdam",
"user" : [
{
"first" : "John",
"last" : "Smith"
},
{
"first" : "Alice",
"last" : "White"
}
]
}
```
(https://www.elastic.co/guide/en/elasticsearch/reference/current/nested.html)
this would be transformed internally (in Elasticsearch) into a document that looks more like this:
```
{
"group" : "amsterdam",
"user.first" : [ "alice", "john" ],
"user.last" : [ "smith", "white" ]
}
```
When mapping this a pandas data frame we mimic this transformation.
Similarly, if a list is added to Elasticsearch:
```
PUT my_index/_doc/1
{
"list" : [
0, 1, 2
]
}
```
The mapping is:
```
"mappings" : {
"properties" : {
"user" : {
"type" : "long"
}
}
}
```
TODO - explain how lists are handled (https://www.elastic.co/guide/en/elasticsearch/reference/current/array.html)
TODO - an option here is to use Elasticsearch's multi-field matching instead of pandas treatment of lists (which isn't great)
NOTE - using this lists is generally not a good way to use this API
client : eland.Client
A reference to a Elasticsearch python client
"""
def flatten_dict(y):
out = {}
if query_compiler is None:
query_compiler = ElandQueryCompiler(client=client,
index_pattern=index_pattern,
columns=columns,
index_field=index_field)
self._query_compiler = query_compiler
def flatten(x, name=''):
# We flatten into source fields e.g. if type=geo_point
# location: {lat=52.38, lon=4.90}
if name == '':
is_source_field = False
pd_dtype = 'object'
else:
is_source_field, pd_dtype = self._mappings.source_field_pd_dtype(name[:-1])
def _get_index(self):
return self._query_compiler.index
if not is_source_field and type(x) is dict:
for a in x:
flatten(x[a], name + a + '.')
elif not is_source_field and type(x) is list:
for a in x:
flatten(a, name)
elif is_source_field == True: # only print source fields from mappings (TODO - not so efficient for large number of fields and filtered mapping)
field_name = name[:-1]
index = property(_get_index)
# Coerce types - for now just datetime
if pd_dtype == 'datetime64[ns]':
x = pd.to_datetime(x)
def _build_repr_df(self, num_rows, num_cols):
# Overriden version of BasePandasDataset._build_repr_df
# to avoid issues with concat
if len(self.index) <= num_rows:
return self.to_pandas()
# Elasticsearch can have multiple values for a field. These are represented as lists, so
# create lists for this pivot (see notes above)
if field_name in out:
if type(out[field_name]) is not list:
l = [out[field_name]]
out[field_name] = l
out[field_name].append(x)
else:
out[field_name] = x
num_rows = num_rows + 1
flatten(y)
head_rows = int(num_rows / 2) + num_rows % 2
tail_rows = num_rows - head_rows
return out
rows = []
index = []
if isinstance(results, dict):
iterator = results['hits']['hits']
else:
iterator = results
for hit in iterator:
row = hit['_source']
# get index value - can be _id or can be field value in source
if self._index.is_source_field:
index_field = row[self._index.index_field]
else:
index_field = hit[self._index.index_field]
index.append(index_field)
# flatten row to map correctly to 2D DataFrame
rows.append(flatten_dict(row))
# Create pandas DataFrame
df = pd.DataFrame(data=rows, index=index)
# _source may not contain all columns in the mapping
# therefore, fill in missing columns
# (note this returns self.columns NOT IN df.columns)
missing_columns = list(set(self._columns) - set(df.columns))
for missing in missing_columns:
is_source_field, pd_dtype = self._mappings.source_field_pd_dtype(missing)
df[missing] = None
df[missing].astype(pd_dtype)
# Sort columns in mapping order
df = df[self._columns]
return df
def _head(self, n=5):
"""
Protected method that returns head as pandas.DataFrame.
Returns
-------
_head
pandas.DataFrame of top N values
"""
sort_params = self._index.sort_field + ":asc"
results = self._client.search(index=self._index_pattern, size=n, sort=sort_params)
return self._es_results_to_pandas(results)
def _tail(self, n=5):
"""
Protected method that returns tail as pandas.DataFrame.
Returns
-------
_tail
pandas.DataFrame of last N values
"""
sort_params = self._index.sort_field + ":desc"
results = self._client.search(index=self._index_pattern, size=n, sort=sort_params)
df = self._es_results_to_pandas(results)
# reverse order (index ascending)
return df.sort_index()
def _to_pandas(self):
"""
Protected method that returns all data as pandas.DataFrame.
Returns
-------
df
pandas.DataFrame of all values
"""
sort_params = self._index.sort_field + ":asc"
results = self._client.scan(index=self._index_pattern)
# We sort here rather than in scan - once everything is in core this
# should be faster
return self._es_results_to_pandas(results)
def _describe(self):
numeric_source_fields = self._mappings.numeric_source_fields()
# for each field we compute:
# count, mean, std, min, 25%, 50%, 75%, max
search = Search(using=self._client, index=self._index_pattern).extra(size=0)
for field in numeric_source_fields:
search.aggs.metric('extended_stats_' + field, 'extended_stats', field=field)
search.aggs.metric('percentiles_' + field, 'percentiles', field=field)
response = search.execute()
results = {}
for field in numeric_source_fields:
values = list()
values.append(response.aggregations['extended_stats_' + field]['count'])
values.append(response.aggregations['extended_stats_' + field]['avg'])
values.append(response.aggregations['extended_stats_' + field]['std_deviation'])
values.append(response.aggregations['extended_stats_' + field]['min'])
values.append(response.aggregations['percentiles_' + field]['values']['25.0'])
values.append(response.aggregations['percentiles_' + field]['values']['50.0'])
values.append(response.aggregations['percentiles_' + field]['values']['75.0'])
values.append(response.aggregations['extended_stats_' + field]['max'])
# if not None
if values.count(None) < len(values):
results[field] = values
df = pd.DataFrame(data=results, index=['count', 'mean', 'std', 'min', '25%', '50%', '75%', 'max'])
return df
def _filter_mappings(self, columns):
mappings = ed.Mappings(mappings=self._mappings, columns=columns)
return mappings
@property
def columns(self):
return self._columns
@property
def index(self):
return self._index
@property
def dtypes(self):
return self._mappings.dtypes()
@property
def _columns(self):
return pd.Index(self._mappings.source_fields())
def get_dtype_counts(self):
return self._mappings.get_dtype_counts()
def _index_count(self):
"""
Returns
-------
index_count: int
Count of docs where index_field exists
"""
exists_query = {"query": {"exists": {"field": self._index.index_field}}}
index_count = self._client.count(index=self._index_pattern, body=exists_query)
return index_count
def __len__(self):
"""
Returns length of info axis, but here we use the index.
"""
return self._client.count(index=self._index_pattern)
def _fake_head_tail_df(self, max_rows=1):
"""
Create a 'fake' pd.DataFrame of the entire ed.DataFrame
by concat head and tail. Used for display.
"""
head_rows = int(max_rows / 2) + max_rows % 2
tail_rows = max_rows - head_rows
head = self._head(head_rows)
tail = self._tail(tail_rows)
head = self.head(head_rows).to_pandas()
tail = self.tail(tail_rows).to_pandas()
return head.append(tail)
def to_pandas(self):
return self._query_compiler.to_pandas()

232
eland/operations.py Normal file
View File

@ -0,0 +1,232 @@
from enum import Enum
class Operations:
"""
A collector of the queries and selectors we apply to queries to return the appropriate results.
For example,
- a list of the columns in the DataFrame (a subset of columns in the index)
- a size limit on the results (e.g. for head(n=5))
- a query to filter the results (e.g. df.A > 10)
This is maintained as a 'task graph' (inspired by dask)
A task graph is a dictionary mapping keys to computations:
A key is any hashable value that is not a task:
```
{'x': 1,
'y': 2,
'z': (add, 'x', 'y'),
'w': (sum, ['x', 'y', 'z']),
'v': [(sum, ['w', 'z']), 2]}
```
(see https://docs.dask.org/en/latest/spec.html)
"""
class SortOrder(Enum):
ASC = 0
DESC = 1
@staticmethod
def reverse(order):
if order == Operations.SortOrder.ASC:
return Operations.SortOrder.DESC
return Operations.SortOrder.ASC
@staticmethod
def to_string(order):
if order == Operations.SortOrder.ASC:
return ":asc"
return ":desc"
def __init__(self, tasks=None):
if tasks == None:
self._tasks = []
else:
self._tasks = tasks
def __constructor__(self, *args, **kwargs):
return type(self)(*args, **kwargs)
def copy(self):
return self.__constructor__(tasks=self._tasks.copy())
def head(self, index, n):
# Add a task that is an ascending sort with size=n
task = ('head', (index.sort_field, n))
self._tasks.append(task)
def tail(self, index, n):
# Add a task that is descending sort with size=n
task = ('tail', (index.sort_field, n))
self._tasks.append(task)
def set_columns(self, columns):
self._tasks['columns'] = columns
def __repr__(self):
return repr(self._tasks)
def to_pandas(self, query_compiler):
query, post_processing = self._to_es_query()
size, sort_params = Operations._query_to_params(query)
es_results = query_compiler._client.search(
index=query_compiler._index_pattern,
size=size,
sort=sort_params)
df = query_compiler._es_results_to_pandas(es_results)
return self._apply_df_post_processing(df, post_processing)
def to_count(self, query_compiler):
query, post_processing = self._to_es_query()
size = query['query_size'] # can be None
pp_size = self._count_post_processing(post_processing)
if pp_size is not None:
if size is not None:
size = min(size, pp_size)
else:
size = pp_size
# Size is dictated by operations
if size is not None:
return size
exists_query = {"query": {"exists": {"field": query_compiler.index.index_field}}}
return query_compiler._client.count(index=query_compiler._index_pattern, body=exists_query)
@staticmethod
def _query_to_params(query):
sort_params = None
if query['query_sort_field'] and query['query_sort_order']:
sort_params = query['query_sort_field'] + Operations.SortOrder.to_string(query['query_sort_order'])
size = query['query_size']
return size, sort_params
1
@staticmethod
def _count_post_processing(post_processing):
size = None
for action in post_processing:
if action[0] == 'head' or action[0] == 'tail':
if size is None or action[1][1] < size:
size = action[1][1]
return size
@staticmethod
def _apply_df_post_processing(df, post_processing):
for action in post_processing:
print(action)
if action == 'sort_index':
df = df.sort_index()
elif action[0] == 'head':
df = df.head(action[1][1])
elif action[0] == 'tail':
df = df.tail(action[1][1])
return df
def _to_es_query(self):
# We now try and combine all tasks into an Elasticsearch query
# Some operations can be simply combined into a single query
# other operations require pre-queries and then combinations
# other operations require in-core post-processing of results
query = {"query_sort_field": None,
"query_sort_order": None,
"query_size": None}
post_processing = []
for task in self._tasks:
if task[0] == 'head':
query, post_processing = self._resolve_head(task, query, post_processing)
elif task[0] == 'tail':
query, post_processing = self._resolve_tail(task, query, post_processing)
return query, post_processing
def _resolve_head(self, item, query, post_processing):
# head - sort asc, size n
# |12345-------------|
query_sort_field = item[1][0]
query_sort_order = Operations.SortOrder.ASC
query_size = item[1][1]
# If we are already postprocessing the query results, we just get 'head' of these
# (note, currently we just append another head, we don't optimise by
# overwriting previous head)
if len(post_processing) > 0:
post_processing.append(item)
return query, post_processing
if query['query_sort_field'] is None:
query['query_sort_field'] = query_sort_field
# if it is already sorted we use existing field
if query['query_sort_order'] is None:
query['query_sort_order'] = query_sort_order
# if it is already sorted we get head of existing order
if query['query_size'] is None:
query['query_size'] = query_size
else:
# truncate if head is smaller
if query_size < query['query_size']:
query['query_size'] = query_size
return query, post_processing
def _resolve_tail(self, item, query, post_processing):
# tail - sort desc, size n, post-process sort asc
# |-------------12345|
query_sort_field = item[1][0]
query_sort_order = Operations.SortOrder.DESC
query_size = item[1][1]
# If this is a tail of a tail adjust settings and return
if query['query_size'] is not None and \
query['query_sort_order'] == query_sort_order and \
post_processing == [('sort_index')]:
if query_size < query['query_size']:
query['query_size'] = query_size
return query, post_processing
# If we are already postprocessing the query results, just get 'tail' of these
# (note, currently we just append another tail, we don't optimise by
# overwriting previous tail)
if len(post_processing) > 0:
post_processing.append(item)
return query, post_processing
# If results are already constrained, just get 'tail' of these
# (note, currently we just append another tail, we don't optimise by
# overwriting previous tail)
if query['query_size'] is not None:
post_processing.append(item)
return query, post_processing
else:
query['query_size'] = query_size
if query['query_sort_field'] is None:
query['query_sort_field'] = query_sort_field
if query['query_sort_order'] is None:
query['query_sort_order'] = query_sort_order
else:
# reverse sort order
query['query_sort_order'] = Operations.SortOrder.reverse(query_sort_order)
post_processing.append(('sort_index'))
return query, post_processing

247
eland/query_compiler.py Normal file
View File

@ -0,0 +1,247 @@
import pandas as pd
from modin.backends.base.query_compiler import BaseQueryCompiler
from eland import Client
from eland import Index
from eland import Mappings
from eland import Operations
class ElandQueryCompiler(BaseQueryCompiler):
def __init__(self,
client=None,
index_pattern=None,
columns=None,
index_field=None,
operations=None):
self._client = Client(client)
self._index_pattern = index_pattern
# Get and persist mappings, this allows us to correctly
# map returned types from Elasticsearch to pandas datatypes
self._mappings = Mappings(client=self._client, index_pattern=self._index_pattern)
self._index = Index(self, index_field)
if operations is None:
self._operations = Operations()
else:
self._operations = operations
def _get_index(self):
return self._index
def _get_columns(self):
return pd.Index(self._mappings.source_fields())
columns = property(_get_columns)
index = property(_get_index)
# END Index, columns, and dtypes objects
def _es_results_to_pandas(self, results):
"""
Parameters
----------
results: dict
Elasticsearch results from self.client.search
Returns
-------
df: pandas.DataFrame
_source values extracted from results and mapped to pandas DataFrame
dtypes are mapped via Mapping object
Notes
-----
Fields containing lists in Elasticsearch don't map easily to pandas.DataFrame
For example, an index with mapping:
```
"mappings" : {
"properties" : {
"group" : {
"type" : "keyword"
},
"user" : {
"type" : "nested",
"properties" : {
"first" : {
"type" : "keyword"
},
"last" : {
"type" : "keyword"
}
}
}
}
}
```
Adding a document:
```
"_source" : {
"group" : "amsterdam",
"user" : [
{
"first" : "John",
"last" : "Smith"
},
{
"first" : "Alice",
"last" : "White"
}
]
}
```
(https://www.elastic.co/guide/en/elasticsearch/reference/current/nested.html)
this would be transformed internally (in Elasticsearch) into a document that looks more like this:
```
{
"group" : "amsterdam",
"user.first" : [ "alice", "john" ],
"user.last" : [ "smith", "white" ]
}
```
When mapping this a pandas data frame we mimic this transformation.
Similarly, if a list is added to Elasticsearch:
```
PUT my_index/_doc/1
{
"list" : [
0, 1, 2
]
}
```
The mapping is:
```
"mappings" : {
"properties" : {
"user" : {
"type" : "long"
}
}
}
```
TODO - explain how lists are handled (https://www.elastic.co/guide/en/elasticsearch/reference/current/array.html)
TODO - an option here is to use Elasticsearch's multi-field matching instead of pandas treatment of lists (which isn't great)
NOTE - using this lists is generally not a good way to use this API
"""
def flatten_dict(y):
out = {}
def flatten(x, name=''):
# We flatten into source fields e.g. if type=geo_point
# location: {lat=52.38, lon=4.90}
if name == '':
is_source_field = False
pd_dtype = 'object'
else:
is_source_field, pd_dtype = self._mappings.source_field_pd_dtype(name[:-1])
if not is_source_field and type(x) is dict:
for a in x:
flatten(x[a], name + a + '.')
elif not is_source_field and type(x) is list:
for a in x:
flatten(a, name)
elif is_source_field == True: # only print source fields from mappings (TODO - not so efficient for large number of fields and filtered mapping)
field_name = name[:-1]
# Coerce types - for now just datetime
if pd_dtype == 'datetime64[ns]':
x = pd.to_datetime(x)
# Elasticsearch can have multiple values for a field. These are represented as lists, so
# create lists for this pivot (see notes above)
if field_name in out:
if type(out[field_name]) is not list:
l = [out[field_name]]
out[field_name] = l
out[field_name].append(x)
else:
out[field_name] = x
flatten(y)
return out
rows = []
index = []
if isinstance(results, dict):
iterator = results['hits']['hits']
else:
iterator = results
for hit in iterator:
row = hit['_source']
# get index value - can be _id or can be field value in source
if self._index.is_source_field:
index_field = row[self._index.index_field]
else:
index_field = hit[self._index.index_field]
index.append(index_field)
# flatten row to map correctly to 2D DataFrame
rows.append(flatten_dict(row))
# Create pandas DataFrame
df = pd.DataFrame(data=rows, index=index)
# _source may not contain all columns in the mapping
# therefore, fill in missing columns
# (note this returns self.columns NOT IN df.columns)
missing_columns = list(set(self.columns) - set(df.columns))
for missing in missing_columns:
is_source_field, pd_dtype = self._mappings.source_field_pd_dtype(missing)
df[missing] = None
df[missing].astype(pd_dtype)
# Sort columns in mapping order
df = df[self.columns]
return df
def _index_count(self):
"""
Returns
-------
index_count: int
Count of docs where index_field exists
"""
return self._operations.to_count(self)
def copy(self):
return self.__constructor__(
client=self._client,
index_pattern=self._index_pattern,
columns=self.columns,
index_field=self._index.index_field,
operations=self._operations.copy()
)
def head(self, n):
result = self.copy()
result._operations.head(self._index, n)
return result
def tail(self, n):
result = self.copy()
result._operations.tail(self._index, n)
return result
# To/From Pandas
def to_pandas(self):
"""Converts Eland DataFrame to Pandas DataFrame.
Returns:
Pandas DataFrame
"""
return self._operations.to_pandas(self)

View File

@ -1,402 +0,0 @@
"""
Series
---------
One-dimensional ndarray with axis labels (including time series).
The underlying data resides in Elasticsearch and the API aligns as much as
possible with pandas.DataFrame API.
This allows the eland.Series to access large datasets stored in Elasticsearch,
without storing the dataset in local memory.
Implementation Details
----------------------
Based on NDFrame which underpins eland.1DataFrame
"""
import sys
import pandas as pd
import pandas.compat as compat
from pandas.compat import StringIO
from pandas.core.dtypes.common import (
is_categorical_dtype)
from pandas.io.formats import format as fmt
from pandas.io.formats.printing import pprint_thing
from eland import Index
from eland import NDFrame
class Series(NDFrame):
"""
pandas.Series like API that proxies into Elasticsearch index(es).
Parameters
----------
client : eland.Client
A reference to a Elasticsearch python client
index_pattern : str
An Elasticsearch index pattern. This can contain wildcards (e.g. filebeat-*).
field_name : str
The field to base the series on
See Also
--------
Examples
--------
import eland as ed
client = ed.Client(Elasticsearch())
s = ed.DataFrame(client, 'reviews', 'date')
df.head()
reviewerId vendorId rating date
0 0 0 5 2006-04-07 17:08
1 1 1 5 2006-05-04 12:16
2 2 2 4 2006-04-21 12:26
3 3 3 5 2006-04-18 15:48
4 3 4 5 2006-04-18 15:49
Notice that the types are based on Elasticsearch mappings
Notes
-----
If the Elasticsearch index is deleted or index mappings are changed after this
object is created, the object is not rebuilt and so inconsistencies can occur.
"""
def __init__(self,
client,
index_pattern,
field_name=None,
mappings=None,
index_field=None):
# python 3 syntax
super().__init__(client, index_pattern, mappings=mappings, index_field=index_field)
# now select column (field_name)
if field_name is not None:
self._mappings = self._filter_mappings([field_name])
elif len(self._mappings.source_fields()) != 1:
raise TypeError('Series must have 1 field: [{0}]'.format(len(self._mappings.source_fields())))
def head(self, n=5):
return self._df_to_series(super()._head(n))
def tail(self, n=5):
return self._df_to_series(super()._tail(n))
def info(self, verbose=None, buf=None, max_cols=None, memory_usage=None,
null_counts=None):
"""
Print a concise summary of a DataFrame.
This method prints information about a DataFrame including
the index dtype and column dtypes, non-null values and memory usage.
This copies a lot of code from pandas.DataFrame.info as it is difficult
to split out the appropriate code or creating a SparseDataFrame gives
incorrect results on types and counts.
"""
if buf is None: # pragma: no cover
buf = sys.stdout
lines = []
lines.append(str(type(self)))
lines.append(self._index_summary())
if len(self.columns) == 0:
lines.append('Empty {name}'.format(name=type(self).__name__))
fmt.buffer_put_lines(buf, lines)
return
cols = self.columns
# hack
if max_cols is None:
max_cols = pd.get_option('display.max_info_columns',
len(self.columns) + 1)
max_rows = pd.get_option('display.max_info_rows', len(self) + 1)
if null_counts is None:
show_counts = ((len(self.columns) <= max_cols) and
(len(self) < max_rows))
else:
show_counts = null_counts
exceeds_info_cols = len(self.columns) > max_cols
def _verbose_repr():
lines.append('Data columns (total %d columns):' %
len(self.columns))
space = max(len(pprint_thing(k)) for k in self.columns) + 4
counts = None
tmpl = "{count}{dtype}"
if show_counts:
counts = self.count()
if len(cols) != len(counts): # pragma: no cover
raise AssertionError(
'Columns must equal counts '
'({cols:d} != {counts:d})'.format(
cols=len(cols), counts=len(counts)))
tmpl = "{count} non-null {dtype}"
dtypes = self.dtypes
for i, col in enumerate(self._columns):
dtype = dtypes.iloc[i]
col = pprint_thing(col)
count = ""
if show_counts:
count = counts.iloc[i]
lines.append(_put_str(col, space) + tmpl.format(count=count,
dtype=dtype))
def _non_verbose_repr():
lines.append(self._columns._summary(name='Columns'))
def _sizeof_fmt(num, size_qualifier):
# returns size in human readable format
for x in ['bytes', 'KB', 'MB', 'GB', 'TB']:
if num < 1024.0:
return ("{num:3.1f}{size_q} "
"{x}".format(num=num, size_q=size_qualifier, x=x))
num /= 1024.0
return "{num:3.1f}{size_q} {pb}".format(num=num,
size_q=size_qualifier,
pb='PB')
if verbose:
_verbose_repr()
elif verbose is False: # specifically set to False, not nesc None
_non_verbose_repr()
else:
if exceeds_info_cols:
_non_verbose_repr()
else:
_verbose_repr()
counts = self.get_dtype_counts()
dtypes = ['{k}({kk:d})'.format(k=k[0], kk=k[1]) for k
in sorted(counts.items())]
lines.append('dtypes: {types}'.format(types=', '.join(dtypes)))
if memory_usage is None:
memory_usage = pd.get_option('display.memory_usage')
if memory_usage:
# append memory usage of df to display
size_qualifier = ''
# TODO - this is different from pd.DataFrame as we shouldn't
# really hold much in memory. For now just approximate with getsizeof + ignore deep
mem_usage = sys.getsizeof(self)
lines.append("memory usage: {mem}\n".format(
mem=_sizeof_fmt(mem_usage, size_qualifier)))
fmt.buffer_put_lines(buf, lines)
@property
def name(self):
return list(self._mappings.source_fields())[0]
@property
def shape(self):
"""
Return a tuple representing the dimensionality of the DataFrame.
Returns
-------
shape: tuple
0 - number of rows
1 - number of columns
"""
num_rows = len(self)
num_columns = len(self._columns)
return num_rows, num_columns
@property
def set_index(self, index_field):
copy = self.copy()
copy._index = Index(index_field)
return copy
def _index_summary(self):
head = self.head(1).index[0]
tail = self.tail(1).index[0]
index_summary = ', %s to %s' % (pprint_thing(head),
pprint_thing(tail))
name = "Index"
return '%s: %s entries%s' % (name, len(self), index_summary)
def count(self):
"""
Count non-NA cells for each column (TODO row)
Counts are based on exists queries against ES
This is inefficient, as it creates N queries (N is number of fields).
An alternative approach is to use value_count aggregations. However, they have issues in that:
1. They can only be used with aggregatable fields (e.g. keyword not text)
2. For list fields they return multiple counts. E.g. tags=['elastic', 'ml'] returns value_count=2
for a single document.
"""
counts = {}
for field in self._mappings.source_fields():
exists_query = {"query": {"exists": {"field": field}}}
field_exists_count = self._client.count(index=self._index_pattern, body=exists_query)
counts[field] = field_exists_count
count = pd.Series(data=counts, index=self._mappings.source_fields())
return count
def describe(self):
return super()._describe()
def _df_to_series(self, df):
return df[self.name]
# ----------------------------------------------------------------------
# Rendering Methods
def __repr__(self):
"""
From pandas
"""
buf = StringIO()
max_rows = pd.get_option("display.max_rows")
self.to_string(buf=buf, na_rep='NaN', float_format=None, header=True, index=True, length=True,
dtype=True, name=True, max_rows=max_rows)
return buf.getvalue()
def to_string(self, buf=None, na_rep='NaN',
float_format=None, header=True,
index=True, length=True, dtype=True,
name=True, max_rows=None):
"""
From pandas 0.24.2
Render a string representation of the Series.
Parameters
----------
buf : StringIO-like, optional
buffer to write to
na_rep : string, optional
string representation of NAN to use, default 'NaN'
float_format : one-parameter function, optional
formatter function to apply to columns' elements if they are floats
default None
header : boolean, default True
Add the Series header (index name)
index : bool, optional
Add index (row) labels, default True
length : boolean, default False
Add the Series length
dtype : boolean, default False
Add the Series dtype
name : boolean, default False
Add the Series name if not None
max_rows : int, optional
Maximum number of rows to show before truncating. If None, show
all.
Returns
-------
formatted : string (if not buffer passed)
"""
if max_rows == None:
max_rows = pd.get_option("display.max_rows")
df = self._fake_head_tail_df(max_rows=max_rows + 1)
s = self._df_to_series(df)
formatter = Series.SeriesFormatter(s, len(self), name=name, length=length,
header=header, index=index,
dtype=dtype, na_rep=na_rep,
float_format=float_format,
max_rows=max_rows)
result = formatter.to_string()
# catch contract violations
if not isinstance(result, compat.text_type):
raise AssertionError("result must be of type unicode, type"
" of result is {0!r}"
"".format(result.__class__.__name__))
if buf is None:
return result
else:
try:
buf.write(result)
except AttributeError:
with open(buf, 'w') as f:
f.write(result)
class SeriesFormatter(fmt.SeriesFormatter):
"""
A hacked overridden version of pandas.io.formats.SeriesFormatter that writes correct length
"""
def __init__(self, series, series_length, buf=None, length=True, header=True, index=True,
na_rep='NaN', name=False, float_format=None, dtype=True,
max_rows=None):
super().__init__(series, buf=buf, length=length, header=header, index=index,
na_rep=na_rep, name=name, float_format=float_format, dtype=dtype,
max_rows=max_rows)
self._series_length = series_length
def _get_footer(self):
"""
Overridden with length change
(from pandas 0.24.2 io.formats.SeriesFormatter)
"""
name = self.series.name
footer = ''
if getattr(self.series.index, 'freq', None) is not None:
footer += 'Freq: {freq}'.format(freq=self.series.index.freqstr)
if self.name is not False and name is not None:
if footer:
footer += ', '
series_name = pprint_thing(name,
escape_chars=('\t', '\r', '\n'))
footer += ("Name: {sname}".format(sname=series_name)
if name is not None else "")
if (self.length is True or
(self.length == 'truncate' and self.truncate_v)):
if footer:
footer += ', '
footer += 'Length: {length}'.format(length=self._series_length)
if self.dtype is not False and self.dtype is not None:
name = getattr(self.tr_series.dtype, 'name', None)
if name:
if footer:
footer += ', '
footer += 'dtype: {typ}'.format(typ=pprint_thing(name))
# level infos are added to the end and in a new line, like it is done
# for Categoricals
if is_categorical_dtype(self.tr_series.dtype):
level_info = self.tr_series._values._repr_categories_info()
if footer:
footer += "\n"
footer += level_info
return compat.text_type(footer)

View File

@ -1,486 +0,0 @@
import os
import pandas as pd
ROOT_DIR = os.path.dirname(os.path.abspath(__file__))
# Define test files and indices
ELASTICSEARCH_HOST = 'localhost' # TODO externalise this
FLIGHTS_INDEX_NAME = 'flights'
FLIGHTS_MAPPING = { "mappings" : {
"properties" : {
"AvgTicketPrice" : {
"type" : "float"
},
"Cancelled" : {
"type" : "boolean"
},
"Carrier" : {
"type" : "keyword"
},
"Dest" : {
"type" : "keyword"
},
"DestAirportID" : {
"type" : "keyword"
},
"DestCityName" : {
"type" : "keyword"
},
"DestCountry" : {
"type" : "keyword"
},
"DestLocation" : {
"type" : "geo_point"
},
"DestRegion" : {
"type" : "keyword"
},
"DestWeather" : {
"type" : "keyword"
},
"DistanceKilometers" : {
"type" : "float"
},
"DistanceMiles" : {
"type" : "float"
},
"FlightDelay" : {
"type" : "boolean"
},
"FlightDelayMin" : {
"type" : "integer"
},
"FlightDelayType" : {
"type" : "keyword"
},
"FlightNum" : {
"type" : "keyword"
},
"FlightTimeHour" : {
"type" : "float"
},
"FlightTimeMin" : {
"type" : "float"
},
"Origin" : {
"type" : "keyword"
},
"OriginAirportID" : {
"type" : "keyword"
},
"OriginCityName" : {
"type" : "keyword"
},
"OriginCountry" : {
"type" : "keyword"
},
"OriginLocation" : {
"type" : "geo_point"
},
"OriginRegion" : {
"type" : "keyword"
},
"OriginWeather" : {
"type" : "keyword"
},
"dayOfWeek" : {
"type" : "integer"
},
"timestamp" : {
"type" : "date"
}
}
} }
FLIGHTS_FILE_NAME = ROOT_DIR + '/flights.json.gz'
FLIGHTS_DF_FILE_NAME = ROOT_DIR + '/flights_df.json.gz'
ECOMMERCE_INDEX_NAME = 'ecommerce'
ECOMMERCE_MAPPING = { "mappings" : {
"properties" : {
"category" : {
"type" : "text",
"fields" : {
"keyword" : {
"type" : "keyword"
}
}
},
"currency" : {
"type" : "keyword"
},
"customer_birth_date" : {
"type" : "date"
},
"customer_first_name" : {
"type" : "text",
"fields" : {
"keyword" : {
"type" : "keyword",
"ignore_above" : 256
}
}
},
"customer_full_name" : {
"type" : "text",
"fields" : {
"keyword" : {
"type" : "keyword",
"ignore_above" : 256
}
}
},
"customer_gender" : {
"type" : "keyword"
},
"customer_id" : {
"type" : "keyword"
},
"customer_last_name" : {
"type" : "text",
"fields" : {
"keyword" : {
"type" : "keyword",
"ignore_above" : 256
}
}
},
"customer_phone" : {
"type" : "keyword"
},
"day_of_week" : {
"type" : "keyword"
},
"day_of_week_i" : {
"type" : "integer"
},
"email" : {
"type" : "keyword"
},
"geoip" : {
"properties" : {
"city_name" : {
"type" : "keyword"
},
"continent_name" : {
"type" : "keyword"
},
"country_iso_code" : {
"type" : "keyword"
},
"location" : {
"type" : "geo_point"
},
"region_name" : {
"type" : "keyword"
}
}
},
"manufacturer" : {
"type" : "text",
"fields" : {
"keyword" : {
"type" : "keyword"
}
}
},
"order_date" : {
"type" : "date"
},
"order_id" : {
"type" : "keyword"
},
"products" : {
"properties" : {
"_id" : {
"type" : "text",
"fields" : {
"keyword" : {
"type" : "keyword",
"ignore_above" : 256
}
}
},
"base_price" : {
"type" : "half_float"
},
"base_unit_price" : {
"type" : "half_float"
},
"category" : {
"type" : "text",
"fields" : {
"keyword" : {
"type" : "keyword"
}
}
},
"created_on" : {
"type" : "date"
},
"discount_amount" : {
"type" : "half_float"
},
"discount_percentage" : {
"type" : "half_float"
},
"manufacturer" : {
"type" : "text",
"fields" : {
"keyword" : {
"type" : "keyword"
}
}
},
"min_price" : {
"type" : "half_float"
},
"price" : {
"type" : "half_float"
},
"product_id" : {
"type" : "long"
},
"product_name" : {
"type" : "text",
"fields" : {
"keyword" : {
"type" : "keyword"
}
},
"analyzer" : "english"
},
"quantity" : {
"type" : "integer"
},
"sku" : {
"type" : "keyword"
},
"tax_amount" : {
"type" : "half_float"
},
"taxful_price" : {
"type" : "half_float"
},
"taxless_price" : {
"type" : "half_float"
},
"unit_discount_amount" : {
"type" : "half_float"
}
}
},
"sku" : {
"type" : "keyword"
},
"taxful_total_price" : {
"type" : "half_float"
},
"taxless_total_price" : {
"type" : "half_float"
},
"total_quantity" : {
"type" : "integer"
},
"total_unique_products" : {
"type" : "integer"
},
"type" : {
"type" : "keyword"
},
"user" : {
"type" : "keyword"
}
}
} }
ECOMMERCE_FILE_NAME = ROOT_DIR + '/ecommerce.json.gz'
ECOMMERCE_DF_FILE_NAME = ROOT_DIR + '/ecommerce_df.json.gz'
TEST_MAPPING1 = {
'mappings': {
'properties': {
'city': {
'type': 'text',
'fields': {
'raw': {
'type': 'keyword'
}
}
},
'text': {
'type': 'text',
'fields': {
'english': {
'type': 'text',
'analyzer': 'english'
}
}
},
'origin_location': {
'properties': {
'lat': {
'type': 'text',
'index_prefixes': {},
'fields': {
'keyword': {
'type': 'keyword',
'ignore_above': 256
}
}
},
'lon': {
'type': 'text',
'fields': {
'keyword': {
'type': 'keyword',
'ignore_above': 256
}
}
}
}
},
'maps-telemetry': {
'properties': {
'attributesPerMap': {
'properties': {
'dataSourcesCount': {
'properties': {
'avg': {
'type': 'long'
},
'max': {
'type': 'long'
},
'min': {
'type': 'long'
}
}
},
'emsVectorLayersCount': {
'dynamic': 'true',
'properties': {
'france_departments': {
'properties': {
'avg': {
'type': 'float'
},
'max': {
'type': 'long'
},
'min': {
'type': 'long'
}
}
}
}
}
}
}
}
},
'type': {
'type': 'keyword'
},
'name': {
'type': 'text'
},
'user_name': {
'type': 'keyword'
},
'email': {
'type': 'keyword'
},
'content': {
'type': 'text'
},
'tweeted_at': {
'type': 'date'
},
'dest_location': {
'type': 'geo_point'
},
'my_join_field': {
'type': 'join',
'relations': {
'question': ['answer', 'comment'],
'answer': 'vote'
}
}
}
}
}
TEST_MAPPING1_INDEX_NAME = 'mapping1'
TEST_MAPPING1_EXPECTED = {
'city': 'text',
'city.raw': 'keyword',
'content': 'text',
'dest_location': 'geo_point',
'email': 'keyword',
'maps-telemetry.attributesPerMap.dataSourcesCount.avg': 'long',
'maps-telemetry.attributesPerMap.dataSourcesCount.max': 'long',
'maps-telemetry.attributesPerMap.dataSourcesCount.min': 'long',
'maps-telemetry.attributesPerMap.emsVectorLayersCount.france_departments.avg': 'float',
'maps-telemetry.attributesPerMap.emsVectorLayersCount.france_departments.max': 'long',
'maps-telemetry.attributesPerMap.emsVectorLayersCount.france_departments.min': 'long',
'my_join_field': 'join',
'name': 'text',
'origin_location.lat': 'text',
'origin_location.lat.keyword': 'keyword',
'origin_location.lon': 'text',
'origin_location.lon.keyword': 'keyword',
'text': 'text',
'text.english': 'text',
'tweeted_at': 'date',
'type': 'keyword',
'user_name': 'keyword'
}
TEST_MAPPING1_EXPECTED_DF = pd.DataFrame.from_dict(data=TEST_MAPPING1_EXPECTED, orient='index', columns=['es_dtype'])
TEST_MAPPING1_EXPECTED_SOURCE_FIELD_DF = TEST_MAPPING1_EXPECTED_DF.drop(index=['city.raw',
'origin_location.lat.keyword',
'origin_location.lon.keyword',
'text.english'])
TEST_MAPPING1_EXPECTED_SOURCE_FIELD_COUNT = len(TEST_MAPPING1_EXPECTED_SOURCE_FIELD_DF.index)
TEST_NESTED_USER_GROUP_INDEX_NAME = 'nested_user_group'
TEST_NESTED_USER_GROUP_MAPPING = {
'mappings': {
'properties': {
'group': {
'type': 'keyword'
},
'user': {
'properties': {
'first': {
'type': 'keyword'
},
'last': {
'type': 'keyword'
},
'address' : {
'type' : 'keyword'
}
}
}
}
}
}
TEST_NESTED_USER_GROUP_DOCS = [
{'_index':TEST_NESTED_USER_GROUP_INDEX_NAME,
'_source':
{'group':'amsterdam','user':[
{'first':'Manke','last':'Nelis','address':['Elandsgracht', 'Amsterdam']},
{'first':'Johnny','last':'Jordaan','address':['Elandsstraat', 'Amsterdam']}]}},
{'_index':TEST_NESTED_USER_GROUP_INDEX_NAME,
'_source':
{'group':'london','user':[
{'first':'Alice','last':'Monkton'},
{'first':'Jimmy','last':'White','address':['London']}]}},
{'_index':TEST_NESTED_USER_GROUP_INDEX_NAME,
'_source':{'group':'new york','user':[
{'first':'Bill','last':'Jones'}]}}
]

View File

@ -1,124 +0,0 @@
# File called _pytest for PyCharm compatability
import numpy as np
from pandas.util.testing import (
assert_series_equal, assert_frame_equal)
import eland as ed
from eland.tests import *
from eland.tests.common import TestData
class TestMapping(TestData):
# Requires 'setup_tests.py' to be run prior to this
def test_fields(self):
mappings = ed.Mappings(ed.Client(ELASTICSEARCH_HOST), TEST_MAPPING1_INDEX_NAME)
assert TEST_MAPPING1_EXPECTED_DF.index.tolist() == mappings.all_fields()
assert_frame_equal(TEST_MAPPING1_EXPECTED_DF, pd.DataFrame(mappings._mappings_capabilities['es_dtype']))
assert TEST_MAPPING1_EXPECTED_SOURCE_FIELD_COUNT == mappings.count_source_fields()
def test_copy(self):
mappings = ed.Mappings(ed.Client(ELASTICSEARCH_HOST), TEST_MAPPING1_INDEX_NAME)
assert TEST_MAPPING1_EXPECTED_DF.index.tolist() == mappings.all_fields()
assert_frame_equal(TEST_MAPPING1_EXPECTED_DF, pd.DataFrame(mappings._mappings_capabilities['es_dtype']))
assert TEST_MAPPING1_EXPECTED_SOURCE_FIELD_COUNT == mappings.count_source_fields()
# Pick 1 source field
columns = ['dest_location']
mappings_copy1 = ed.Mappings(mappings=mappings, columns=columns)
assert columns == mappings_copy1.all_fields()
assert len(columns) == mappings_copy1.count_source_fields()
# Pick 3 source fields (out of order)
columns = ['dest_location', 'city', 'user_name']
mappings_copy2 = ed.Mappings(mappings=mappings, columns=columns)
assert columns == mappings_copy2.all_fields()
assert len(columns) == mappings_copy2.count_source_fields()
# Check original is still ok
assert TEST_MAPPING1_EXPECTED_DF.index.tolist() == mappings.all_fields()
assert_frame_equal(TEST_MAPPING1_EXPECTED_DF, pd.DataFrame(mappings._mappings_capabilities['es_dtype']))
assert TEST_MAPPING1_EXPECTED_SOURCE_FIELD_COUNT == mappings.count_source_fields()
def test_dtypes(self):
mappings = ed.Mappings(ed.Client(ELASTICSEARCH_HOST), TEST_MAPPING1_INDEX_NAME)
expected_dtypes = pd.Series(
{'city': 'object', 'content': 'object', 'dest_location': 'object', 'email': 'object',
'maps-telemetry.attributesPerMap.dataSourcesCount.avg': 'int64',
'maps-telemetry.attributesPerMap.dataSourcesCount.max': 'int64',
'maps-telemetry.attributesPerMap.dataSourcesCount.min': 'int64',
'maps-telemetry.attributesPerMap.emsVectorLayersCount.france_departments.avg': 'float64',
'maps-telemetry.attributesPerMap.emsVectorLayersCount.france_departments.max': 'int64',
'maps-telemetry.attributesPerMap.emsVectorLayersCount.france_departments.min': 'int64',
'my_join_field': 'object', 'name': 'object', 'origin_location.lat': 'object',
'origin_location.lon': 'object', 'text': 'object', 'tweeted_at': 'datetime64[ns]',
'type': 'object', 'user_name': 'object'})
assert_series_equal(expected_dtypes, mappings.dtypes())
def test_get_dtype_counts(self):
mappings = ed.Mappings(ed.Client(ELASTICSEARCH_HOST), TEST_MAPPING1_INDEX_NAME)
expected_get_dtype_counts = pd.Series({'datetime64[ns]': 1, 'float64': 1, 'int64': 5, 'object': 11})
assert_series_equal(expected_get_dtype_counts, mappings.get_dtype_counts())
def test_mapping_capabilities(self):
mappings = ed.Mappings(ed.Client(ELASTICSEARCH_HOST), TEST_MAPPING1_INDEX_NAME)
field_capabilities = mappings.field_capabilities('city')
assert True == field_capabilities['_source']
assert 'text' == field_capabilities['es_dtype']
assert 'object' == field_capabilities['pd_dtype']
assert True == field_capabilities['searchable']
assert False == field_capabilities['aggregatable']
field_capabilities = mappings.field_capabilities('city.raw')
assert False == field_capabilities['_source']
assert 'keyword' == field_capabilities['es_dtype']
assert 'object' == field_capabilities['pd_dtype']
assert True == field_capabilities['searchable']
assert True == field_capabilities['aggregatable']
def test_generate_es_mappings(self):
df = pd.DataFrame(data={'A': np.random.rand(3),
'B': 1,
'C': 'foo',
'D': pd.Timestamp('20190102'),
'E': [1.0, 2.0, 3.0],
'F': False,
'G': [1, 2, 3]},
index=['0','1','2'])
expected_mappings = {'mappings': {
'properties': {'A': {'type': 'double'},
'B': {'type': 'long'},
'C': {'type': 'keyword'},
'D': {'type': 'date'},
'E': {'type': 'double'},
'F': {'type': 'boolean'},
'G': {'type': 'long'}}}}
mappings = ed.Mappings._generate_es_mappings(df)
assert expected_mappings == mappings
# Now create index
index_name = 'eland_test_generate_es_mappings'
ed.pandas_to_es(df, ELASTICSEARCH_HOST, index_name, if_exists="replace", refresh=True)
ed_df = ed.DataFrame(ELASTICSEARCH_HOST, index_name)
ed_df_head = ed_df.head()
assert_frame_equal(df, ed_df_head)

View File

@ -1,44 +0,0 @@
import pytest
import eland as ed
import pandas as pd
import os
ROOT_DIR = os.path.dirname(os.path.abspath(__file__))
# Create pandas and eland data frames
from eland.tests import ELASTICSEARCH_HOST
from eland.tests import FLIGHTS_DF_FILE_NAME, FLIGHTS_INDEX_NAME,\
ECOMMERCE_DF_FILE_NAME, ECOMMERCE_INDEX_NAME
_pd_flights = pd.read_json(FLIGHTS_DF_FILE_NAME).sort_index()
_pd_flights['timestamp'] = \
pd.to_datetime(_pd_flights['timestamp'])
_pd_flights.index = _pd_flights.index.map(str) # make index 'object' not int
_ed_flights = ed.read_es(ELASTICSEARCH_HOST, FLIGHTS_INDEX_NAME)
_pd_ecommerce = pd.read_json(ECOMMERCE_DF_FILE_NAME).sort_index()
_pd_ecommerce['order_date'] = \
pd.to_datetime(_pd_ecommerce['order_date'])
_pd_ecommerce['products.created_on'] = \
_pd_ecommerce['products.created_on'].apply(lambda x: pd.to_datetime(x))
_pd_ecommerce.insert(2, 'customer_birth_date', None)
_pd_ecommerce.index = _pd_ecommerce.index.map(str) # make index 'object' not int
_pd_ecommerce['customer_birth_date'].astype('datetime64')
_ed_ecommerce = ed.read_es(ELASTICSEARCH_HOST, ECOMMERCE_INDEX_NAME)
class TestData:
def pd_flights(self):
return _pd_flights
def ed_flights(self):
return _ed_flights
def pd_ecommerce(self):
return _pd_ecommerce
def ed_ecommerce(self):
return _ed_ecommerce

View File

@ -1,159 +0,0 @@
# File called _pytest for PyCharm compatability
from eland.tests.common import TestData
import pandas as pd
import io
from pandas.util.testing import (
assert_series_equal, assert_frame_equal)
class TestDataFrameBasics(TestData):
def test_mapping(self):
ed_flights_mappings = pd.DataFrame(self.ed_flights()._mappings._mappings_capabilities
[self.ed_flights()._mappings._mappings_capabilities._source==True]
['pd_dtype'])
pd_flights_mappings = pd.DataFrame(self.pd_flights().dtypes, columns = ['pd_dtype'])
assert_frame_equal(pd_flights_mappings, ed_flights_mappings)
# We don't compare ecommerce here as the default dtypes in pandas from read_json
# don't match the mapping types. This is mainly because the products field is
# nested and so can be treated as a multi-field in ES, but not in pandas
def test_head(self):
pd_flights_head = self.pd_flights().head()
ed_flights_head = self.ed_flights().head()
print(ed_flights_head)
assert_frame_equal(pd_flights_head, ed_flights_head)
pd_ecommerce_head = self.pd_ecommerce().head()
ed_ecommerce_head = self.ed_ecommerce().head()
assert_frame_equal(pd_ecommerce_head, ed_ecommerce_head)
def test_tail(self):
pd_flights_tail = self.pd_flights().tail()
ed_flights_tail = self.ed_flights().tail()
print(ed_flights_tail)
assert_frame_equal(pd_flights_tail, ed_flights_tail)
pd_ecommerce_tail = self.pd_ecommerce().tail()
ed_ecommerce_tail = self.ed_ecommerce().tail()
assert_frame_equal(pd_ecommerce_tail, ed_ecommerce_tail)
def test_describe(self):
pd_flights_describe = self.pd_flights().describe()
ed_flights_describe = self.ed_flights().describe()
print(ed_flights_describe)
# TODO - this fails now as ES aggregations are approximate
# if ES percentile agg uses
# "hdr": {
# "number_of_significant_value_digits": 3
# }
# this works
#assert_almost_equal(pd_flights_describe, ed_flights_describe)
pd_ecommerce_describe = self.pd_ecommerce().describe()
ed_ecommerce_describe = self.ed_ecommerce().describe()
print(ed_ecommerce_describe)
# We don't compare ecommerce here as the default dtypes in pandas from read_json
# don't match the mapping types. This is mainly because the products field is
# nested and so can be treated as a multi-field in ES, but not in pandas
def test_size(self):
assert self.pd_flights().shape == self.ed_flights().shape
assert len(self.pd_flights()) == len(self.ed_flights())
def test_to_string(self):
print(self.ed_flights())
print(self.ed_flights().to_string())
def test_info(self):
ed_flights_info_buf = io.StringIO()
pd_flights_info_buf = io.StringIO()
self.ed_flights().info(buf=ed_flights_info_buf)
self.pd_flights().info(buf=pd_flights_info_buf)
print(ed_flights_info_buf.getvalue())
ed_flights_info = (ed_flights_info_buf.getvalue().splitlines())
pd_flights_info = (pd_flights_info_buf.getvalue().splitlines())
flights_diff = set(ed_flights_info).symmetric_difference(set(pd_flights_info))
ed_ecommerce_info_buf = io.StringIO()
pd_ecommerce_info_buf = io.StringIO()
self.ed_ecommerce().info(buf=ed_ecommerce_info_buf)
self.pd_ecommerce().info(buf=pd_ecommerce_info_buf)
ed_ecommerce_info = (ed_ecommerce_info_buf.getvalue().splitlines())
pd_ecommerce_info = (pd_ecommerce_info_buf.getvalue().splitlines())
# We don't compare ecommerce here as the default dtypes in pandas from read_json
# don't match the mapping types. This is mainly because the products field is
# nested and so can be treated as a multi-field in ES, but not in pandas
ecommerce_diff = set(ed_ecommerce_info).symmetric_difference(set(pd_ecommerce_info))
def test_count(self):
pd_flights_count = self.pd_flights().count()
ed_flights_count = self.ed_flights().count()
assert_series_equal(pd_flights_count, ed_flights_count)
pd_ecommerce_count = self.pd_ecommerce().count()
ed_ecommerce_count = self.ed_ecommerce().count()
assert_series_equal(pd_ecommerce_count, ed_ecommerce_count)
def test_get_dtype_counts(self):
pd_flights_get_dtype_counts = self.pd_flights().get_dtype_counts().sort_index()
ed_flights_get_dtype_counts = self.ed_flights().get_dtype_counts().sort_index()
assert_series_equal(pd_flights_get_dtype_counts, ed_flights_get_dtype_counts)
def test_get_properties(self):
pd_flights_shape = self.pd_flights().shape
ed_flights_shape = self.ed_flights().shape
assert pd_flights_shape == ed_flights_shape
pd_flights_columns = self.pd_flights().columns
ed_flights_columns = self.ed_flights().columns
assert pd_flights_columns.tolist() == ed_flights_columns.tolist()
pd_flights_dtypes = self.pd_flights().dtypes
ed_flights_dtypes = self.ed_flights().dtypes
assert_series_equal(pd_flights_dtypes, ed_flights_dtypes)
def test_index(self):
pd_flights = self.pd_flights()
pd_flights_timestamp = pd_flights.set_index('timestamp')
pd_flights.info()
pd_flights_timestamp.info()
pd_flights.info()
ed_flights = self.ed_flights()
ed_flights_timestamp = ed_flights.set_index('timestamp')
ed_flights.info()
ed_flights_timestamp.info()
ed_flights.info()
def test_to_pandas(self):
ed_ecommerce_pd_df = self.ed_ecommerce().to_pandas()
assert_frame_equal(self.pd_ecommerce(), ed_ecommerce_pd_df)

View File

@ -1,47 +0,0 @@
# File called _pytest for PyCharm compatability
from eland.tests.common import TestData
import pandas as pd
import io
from pandas.util.testing import (
assert_series_equal, assert_frame_equal)
class TestDataFrameGetItem(TestData):
def test_getitem_basic(self):
# Test 1 attribute
pd_carrier = self.pd_flights()['Carrier']
ed_carrier = self.ed_flights()['Carrier']
# pandas returns a Series here
assert_series_equal(pd_carrier.head(100), ed_carrier.head(100))
pd_3_items = self.pd_flights()[['Dest','Carrier','FlightDelay']]
ed_3_items = self.ed_flights()[['Dest','Carrier','FlightDelay']]
assert_frame_equal(pd_3_items.head(100), ed_3_items.head(100))
# Test numerics
numerics = ['DistanceMiles', 'AvgTicketPrice', 'FlightTimeMin']
ed_numerics = self.ed_flights()[numerics]
pd_numerics = self.pd_flights()[numerics]
assert_frame_equal(pd_numerics.head(100), ed_numerics.head(100))
# just test headers
ed_numerics_describe = ed_numerics.describe()
assert ed_numerics_describe.columns.tolist() == numerics
def test_getattr_basic(self):
# Test 1 attribute
pd_carrier = self.pd_flights().Carrier
ed_carrier = self.ed_flights().Carrier
assert_series_equal(pd_carrier.head(100), ed_carrier.head(100))
pd_avgticketprice = self.pd_flights().AvgTicketPrice
ed_avgticketprice = self.ed_flights().AvgTicketPrice
assert_series_equal(pd_avgticketprice.head(100), ed_avgticketprice.head(100))

View File

@ -0,0 +1,79 @@
# File called _pytest for PyCharm compatability
import pandas as pd
import io
import eland as ed
from pandas.util.testing import (
assert_series_equal, assert_frame_equal)
class TestDataFrameHeadTail():
def test_head(self):
ed_flights = ed.read_es(es_params='localhost', index_pattern='flights')
head_10 = ed_flights.head(10)
print(head_10._query_compiler._operations._to_es_query())
head_8 = head_10.head(8)
print(head_8._query_compiler._operations._to_es_query())
head_20 = head_10.head(20)
print(head_20._query_compiler._operations._to_es_query())
def test_tail(self):
ed_flights = ed.read_es(es_params='localhost', index_pattern='flights')
tail_10 = ed_flights.tail(10)
print(tail_10._query_compiler._operations._to_es_query())
print(tail_10)
tail_8 = tail_10.tail(8)
print(tail_8._query_compiler._operations._to_es_query())
tail_20 = tail_10.tail(20)
print(tail_20._query_compiler._operations._to_es_query())
def test_head_tail(self):
ed_flights = ed.read_es(es_params='localhost', index_pattern='flights')
head_10 = ed_flights.head(10)
print(head_10._query_compiler._operations._to_es_query())
tail_8 = head_10.tail(8)
print(tail_8._query_compiler._operations._to_es_query())
tail_5 = tail_8.tail(5)
print(tail_5._query_compiler._operations._to_es_query())
head_4 = tail_5.head(4)
print(head_4._query_compiler._operations._to_es_query())
def test_tail_head(self):
ed_flights = ed.read_es(es_params='localhost', index_pattern='flights')
tail_10 = ed_flights.tail(10)
print(tail_10._query_compiler._operations._to_es_query())
head_8 = tail_10.head(8)
print(head_8._query_compiler._operations._to_es_query())
head_5 = head_8.head(5)
print(head_5._query_compiler._operations._to_es_query())
tail_4 = head_5.tail(4)
print(tail_4._query_compiler._operations._to_es_query())
def test_head_tail_print(self):
ed_flights = ed.read_es(es_params='localhost', index_pattern='flights')
tail_100 = ed_flights.tail(100)
print(tail_100._query_compiler._operations._to_es_query())
print(tail_100)
head_10 = tail_100.head(10)
print(head_10)
tail_4 = head_10.tail(4)
print(tail_4._query_compiler._operations._to_es_query())
print(tail_4)

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

View File

@ -1,32 +0,0 @@
# File called _pytest for PyCharm compatability
from eland.tests.common import TestData
import pandas as pd
import eland as ed
import io
from eland.tests import ELASTICSEARCH_HOST
from eland.tests import FLIGHTS_INDEX_NAME
from pandas.util.testing import (
assert_series_equal, assert_frame_equal)
class TestSeriesBasics(TestData):
def test_head_tail(self):
pd_s = self.pd_flights()['Carrier']
ed_s = ed.Series(ELASTICSEARCH_HOST, FLIGHTS_INDEX_NAME, 'Carrier')
pd_s_head = pd_s.head(10)
ed_s_head = ed_s.head(10)
assert_series_equal(pd_s_head, ed_s_head)
pd_s_tail = pd_s.tail(10)
ed_s_tail = ed_s.tail(10)
assert_series_equal(pd_s_tail, ed_s_tail)
def test_print(self):
ed_s = ed.Series(ELASTICSEARCH_HOST, FLIGHTS_INDEX_NAME, 'timestamp')
print(ed_s.to_string())

View File

@ -1,69 +0,0 @@
import pandas as pd
from elasticsearch import Elasticsearch
from elasticsearch import helpers
from eland.tests import *
DATA_LIST = [
(FLIGHTS_FILE_NAME, FLIGHTS_INDEX_NAME, FLIGHTS_MAPPING),
(ECOMMERCE_FILE_NAME, ECOMMERCE_INDEX_NAME, ECOMMERCE_MAPPING)
]
def _setup_data(es):
# Read json file and index records into Elasticsearch
for data in DATA_LIST:
json_file_name = data[0]
index_name = data[1]
mapping = data[2]
# Delete index
print("Deleting index:", index_name)
es.indices.delete(index=index_name, ignore=[400, 404])
print("Creating index:", index_name)
es.indices.create(index=index_name, body=mapping)
df = pd.read_json(json_file_name, lines=True)
actions = []
n = 0
print("Adding", df.shape[0], "items to index:", index_name)
for index, row in df.iterrows():
values = row.to_dict()
# make timestamp datetime 2018-01-01T12:09:35
#values['timestamp'] = datetime.strptime(values['timestamp'], '%Y-%m-%dT%H:%M:%S')
# Use integer as id field for repeatable results
action = {'_index': index_name, '_source': values, '_id': str(n)}
actions.append(action)
n = n + 1
if n % 10000 == 0:
helpers.bulk(es, actions)
actions = []
helpers.bulk(es, actions)
actions = []
print("Done", index_name)
def _setup_test_mappings(es):
# Create a complex mapping containing many Elasticsearch features
es.indices.delete(index=TEST_MAPPING1_INDEX_NAME, ignore=[400, 404])
es.indices.create(index=TEST_MAPPING1_INDEX_NAME, body=TEST_MAPPING1)
def _setup_test_nested(es):
es.indices.delete(index=TEST_NESTED_USER_GROUP_INDEX_NAME, ignore=[400, 404])
es.indices.create(index=TEST_NESTED_USER_GROUP_INDEX_NAME, body=TEST_NESTED_USER_GROUP_MAPPING)
helpers.bulk(es, TEST_NESTED_USER_GROUP_DOCS)
if __name__ == '__main__':
# Create connection to Elasticsearch - use defaults
es = Elasticsearch(ELASTICSEARCH_HOST)
_setup_data(es)
_setup_test_mappings(es)
_setup_test_nested(es)

File diff suppressed because it is too large Load Diff

View File

@ -2,9 +2,11 @@ from eland import Client
from eland import DataFrame
from eland import Mappings
def read_es(es_params, index_pattern):
return DataFrame(client=es_params, index_pattern=index_pattern)
def pandas_to_es(df, es_params, destination_index, if_exists='fail', chunk_size=10000, refresh=False):
"""
Append a pandas DataFrame to an Elasticsearch index.
@ -45,8 +47,8 @@ def pandas_to_es(df, es_params, destination_index, if_exists='fail', chunk_size=
elif if_exists == "replace":
client.indices().delete(destination_index)
client.indices().create(destination_index, mapping)
#elif if_exists == "append":
# TODO validate mapping is compatible
# elif if_exists == "append":
# TODO validate mapping is compatible
else:
client.indices().create(destination_index, mapping)
@ -70,4 +72,3 @@ def pandas_to_es(df, es_params, destination_index, if_exists='fail', chunk_size=
actions = []
client.bulk(actions, refresh=refresh)