mirror of
https://github.com/elastic/eland.git
synced 2025-07-11 00:02:14 +08:00
Merge pull request #14 from stevedodson/master
Introduction of eland.Series - big refactor
This commit is contained in:
commit
f2df2c77b2
@ -1,5 +1,7 @@
|
|||||||
from .utils import *
|
|
||||||
from .dataframe import *
|
|
||||||
from .client import *
|
from .client import *
|
||||||
from .mappings import *
|
from .ndframe import *
|
||||||
from .index import *
|
from .index import *
|
||||||
|
from .mappings import *
|
||||||
|
from .series import *
|
||||||
|
from .dataframe import *
|
||||||
|
from .utils import *
|
||||||
|
@ -1,4 +1,5 @@
|
|||||||
from elasticsearch import Elasticsearch
|
from elasticsearch import Elasticsearch
|
||||||
|
from elasticsearch import helpers
|
||||||
|
|
||||||
class Client():
|
class Client():
|
||||||
"""
|
"""
|
||||||
@ -18,6 +19,12 @@ class Client():
|
|||||||
def indices(self):
|
def indices(self):
|
||||||
return self.es.indices
|
return self.es.indices
|
||||||
|
|
||||||
|
def bulk(self, actions, refresh=False):
|
||||||
|
return helpers.bulk(self.es, actions, refresh=refresh)
|
||||||
|
|
||||||
|
def scan(self, **kwargs):
|
||||||
|
return helpers.scan(self.es, **kwargs)
|
||||||
|
|
||||||
def search(self, **kwargs):
|
def search(self, **kwargs):
|
||||||
return self.es.search(**kwargs)
|
return self.es.search(**kwargs)
|
||||||
|
|
||||||
|
@ -26,18 +26,23 @@ only Elasticsearch aggregatable fields can be aggregated or grouped.
|
|||||||
import sys
|
import sys
|
||||||
|
|
||||||
import pandas as pd
|
import pandas as pd
|
||||||
from elasticsearch_dsl import Search
|
|
||||||
from pandas.compat import StringIO
|
|
||||||
from pandas.core import common as com
|
|
||||||
from pandas.io.common import _expand_user, _stringify_path
|
|
||||||
from pandas.io.formats import format as fmt
|
from pandas.io.formats import format as fmt
|
||||||
from pandas.io.formats.printing import pprint_thing
|
from pandas.io.formats.printing import pprint_thing
|
||||||
|
from pandas.compat import StringIO
|
||||||
|
from pandas.io.common import _expand_user, _stringify_path
|
||||||
from pandas.io.formats import console
|
from pandas.io.formats import console
|
||||||
|
from pandas.core import common as com
|
||||||
|
|
||||||
import eland as ed
|
from eland import NDFrame
|
||||||
|
from eland import Index
|
||||||
|
from eland import Series
|
||||||
|
|
||||||
|
|
||||||
class DataFrame():
|
|
||||||
|
|
||||||
|
|
||||||
|
class DataFrame(NDFrame):
|
||||||
"""
|
"""
|
||||||
pandas.DataFrame like API that proxies into Elasticsearch index(es).
|
pandas.DataFrame like API that proxies into Elasticsearch index(es).
|
||||||
|
|
||||||
@ -49,9 +54,6 @@ class DataFrame():
|
|||||||
index_pattern : str
|
index_pattern : str
|
||||||
An Elasticsearch index pattern. This can contain wildcards (e.g. filebeat-*).
|
An Elasticsearch index pattern. This can contain wildcards (e.g. filebeat-*).
|
||||||
|
|
||||||
operations: list of operation
|
|
||||||
A list of Elasticsearch analytics operations e.g. filter, aggregations etc.
|
|
||||||
|
|
||||||
See Also
|
See Also
|
||||||
--------
|
--------
|
||||||
|
|
||||||
@ -83,229 +85,14 @@ class DataFrame():
|
|||||||
index_pattern,
|
index_pattern,
|
||||||
mappings=None,
|
mappings=None,
|
||||||
index_field=None):
|
index_field=None):
|
||||||
|
# python 3 syntax
|
||||||
self._client = ed.Client(client)
|
super().__init__(client, index_pattern, mappings=mappings, index_field=index_field)
|
||||||
self._index_pattern = index_pattern
|
|
||||||
|
|
||||||
# Get and persist mappings, this allows us to correctly
|
|
||||||
# map returned types from Elasticsearch to pandas datatypes
|
|
||||||
if mappings is None:
|
|
||||||
self._mappings = ed.Mappings(self._client, self._index_pattern)
|
|
||||||
else:
|
|
||||||
self._mappings = mappings
|
|
||||||
|
|
||||||
self._index = ed.Index(index_field)
|
|
||||||
|
|
||||||
def _es_results_to_pandas(self, results):
|
|
||||||
"""
|
|
||||||
Parameters
|
|
||||||
----------
|
|
||||||
results: dict
|
|
||||||
Elasticsearch results from self.client.search
|
|
||||||
|
|
||||||
Returns
|
|
||||||
-------
|
|
||||||
df: pandas.DataFrame
|
|
||||||
_source values extracted from results and mapped to pandas DataFrame
|
|
||||||
dtypes are mapped via Mapping object
|
|
||||||
|
|
||||||
Notes
|
|
||||||
-----
|
|
||||||
Fields containing lists in Elasticsearch don't map easily to pandas.DataFrame
|
|
||||||
For example, an index with mapping:
|
|
||||||
```
|
|
||||||
"mappings" : {
|
|
||||||
"properties" : {
|
|
||||||
"group" : {
|
|
||||||
"type" : "keyword"
|
|
||||||
},
|
|
||||||
"user" : {
|
|
||||||
"type" : "nested",
|
|
||||||
"properties" : {
|
|
||||||
"first" : {
|
|
||||||
"type" : "keyword"
|
|
||||||
},
|
|
||||||
"last" : {
|
|
||||||
"type" : "keyword"
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
```
|
|
||||||
Adding a document:
|
|
||||||
```
|
|
||||||
"_source" : {
|
|
||||||
"group" : "amsterdam",
|
|
||||||
"user" : [
|
|
||||||
{
|
|
||||||
"first" : "John",
|
|
||||||
"last" : "Smith"
|
|
||||||
},
|
|
||||||
{
|
|
||||||
"first" : "Alice",
|
|
||||||
"last" : "White"
|
|
||||||
}
|
|
||||||
]
|
|
||||||
}
|
|
||||||
```
|
|
||||||
(https://www.elastic.co/guide/en/elasticsearch/reference/current/nested.html)
|
|
||||||
this would be transformed internally (in Elasticsearch) into a document that looks more like this:
|
|
||||||
```
|
|
||||||
{
|
|
||||||
"group" : "amsterdam",
|
|
||||||
"user.first" : [ "alice", "john" ],
|
|
||||||
"user.last" : [ "smith", "white" ]
|
|
||||||
}
|
|
||||||
```
|
|
||||||
When mapping this a pandas data frame we mimic this transformation.
|
|
||||||
|
|
||||||
Similarly, if a list is added to Elasticsearch:
|
|
||||||
```
|
|
||||||
PUT my_index/_doc/1
|
|
||||||
{
|
|
||||||
"list" : [
|
|
||||||
0, 1, 2
|
|
||||||
]
|
|
||||||
}
|
|
||||||
```
|
|
||||||
The mapping is:
|
|
||||||
```
|
|
||||||
"mappings" : {
|
|
||||||
"properties" : {
|
|
||||||
"user" : {
|
|
||||||
"type" : "long"
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
```
|
|
||||||
TODO - explain how lists are handled (https://www.elastic.co/guide/en/elasticsearch/reference/current/array.html)
|
|
||||||
TODO - an option here is to use Elasticsearch's multi-field matching instead of pandas treatment of lists (which isn't great)
|
|
||||||
NOTE - using this lists is generally not a good way to use this API
|
|
||||||
"""
|
|
||||||
|
|
||||||
def flatten_dict(y):
|
|
||||||
out = {}
|
|
||||||
|
|
||||||
def flatten(x, name=''):
|
|
||||||
# We flatten into source fields e.g. if type=geo_point
|
|
||||||
# location: {lat=52.38, lon=4.90}
|
|
||||||
if name == '':
|
|
||||||
is_source_field = False
|
|
||||||
pd_dtype = 'object'
|
|
||||||
else:
|
|
||||||
is_source_field, pd_dtype = self._mappings.source_field_pd_dtype(name[:-1])
|
|
||||||
|
|
||||||
if not is_source_field and type(x) is dict:
|
|
||||||
for a in x:
|
|
||||||
flatten(x[a], name + a + '.')
|
|
||||||
elif not is_source_field and type(x) is list:
|
|
||||||
for a in x:
|
|
||||||
flatten(a, name)
|
|
||||||
elif is_source_field == True: # only print source fields from mappings (TODO - not so efficient for large number of fields and filtered mapping)
|
|
||||||
field_name = name[:-1]
|
|
||||||
|
|
||||||
# Coerce types - for now just datetime
|
|
||||||
if pd_dtype == 'datetime64[ns]':
|
|
||||||
x = pd.to_datetime(x)
|
|
||||||
|
|
||||||
# Elasticsearch can have multiple values for a field. These are represented as lists, so
|
|
||||||
# create lists for this pivot (see notes above)
|
|
||||||
if field_name in out:
|
|
||||||
if type(out[field_name]) is not list:
|
|
||||||
l = [out[field_name]]
|
|
||||||
out[field_name] = l
|
|
||||||
out[field_name].append(x)
|
|
||||||
else:
|
|
||||||
out[field_name] = x
|
|
||||||
|
|
||||||
flatten(y)
|
|
||||||
|
|
||||||
return out
|
|
||||||
|
|
||||||
rows = []
|
|
||||||
index = []
|
|
||||||
for hit in results['hits']['hits']:
|
|
||||||
row = hit['_source']
|
|
||||||
|
|
||||||
# get index value - can be _id or can be field value in source
|
|
||||||
if self._index.is_source_field:
|
|
||||||
index_field = row[self._index.index_field]
|
|
||||||
else:
|
|
||||||
index_field = hit[self._index.index_field]
|
|
||||||
index.append(index_field)
|
|
||||||
|
|
||||||
# flatten row to map correctly to 2D DataFrame
|
|
||||||
rows.append(flatten_dict(row))
|
|
||||||
|
|
||||||
# Create pandas DataFrame
|
|
||||||
df = pd.DataFrame(data=rows, index=index)
|
|
||||||
|
|
||||||
# _source may not contain all columns in the mapping
|
|
||||||
# therefore, fill in missing columns
|
|
||||||
# (note this returns self.columns NOT IN df.columns)
|
|
||||||
missing_columns = list(set(self.columns) - set(df.columns))
|
|
||||||
|
|
||||||
for missing in missing_columns:
|
|
||||||
is_source_field, pd_dtype = self._mappings.source_field_pd_dtype(missing)
|
|
||||||
df[missing] = None
|
|
||||||
df[missing].astype(pd_dtype)
|
|
||||||
|
|
||||||
# Sort columns in mapping order
|
|
||||||
df = df[self.columns]
|
|
||||||
|
|
||||||
return df
|
|
||||||
|
|
||||||
def head(self, n=5):
|
def head(self, n=5):
|
||||||
sort_params = self._index.sort_field + ":asc"
|
return super()._head(n)
|
||||||
|
|
||||||
results = self._client.search(index=self._index_pattern, size=n, sort=sort_params)
|
|
||||||
|
|
||||||
return self._es_results_to_pandas(results)
|
|
||||||
|
|
||||||
def tail(self, n=5):
|
def tail(self, n=5):
|
||||||
sort_params = self._index.sort_field + ":desc"
|
return super()._tail(n)
|
||||||
|
|
||||||
results = self._client.search(index=self._index_pattern, size=n, sort=sort_params)
|
|
||||||
|
|
||||||
df = self._es_results_to_pandas(results)
|
|
||||||
|
|
||||||
# reverse order (index ascending)
|
|
||||||
return df.sort_index()
|
|
||||||
|
|
||||||
def describe(self):
|
|
||||||
numeric_source_fields = self._mappings.numeric_source_fields()
|
|
||||||
|
|
||||||
# for each field we compute:
|
|
||||||
# count, mean, std, min, 25%, 50%, 75%, max
|
|
||||||
search = Search(using=self._client, index=self._index_pattern).extra(size=0)
|
|
||||||
|
|
||||||
for field in numeric_source_fields:
|
|
||||||
search.aggs.metric('extended_stats_' + field, 'extended_stats', field=field)
|
|
||||||
search.aggs.metric('percentiles_' + field, 'percentiles', field=field)
|
|
||||||
|
|
||||||
response = search.execute()
|
|
||||||
|
|
||||||
results = {}
|
|
||||||
|
|
||||||
for field in numeric_source_fields:
|
|
||||||
values = []
|
|
||||||
values.append(response.aggregations['extended_stats_' + field]['count'])
|
|
||||||
values.append(response.aggregations['extended_stats_' + field]['avg'])
|
|
||||||
values.append(response.aggregations['extended_stats_' + field]['std_deviation'])
|
|
||||||
values.append(response.aggregations['extended_stats_' + field]['min'])
|
|
||||||
values.append(response.aggregations['percentiles_' + field]['values']['25.0'])
|
|
||||||
values.append(response.aggregations['percentiles_' + field]['values']['50.0'])
|
|
||||||
values.append(response.aggregations['percentiles_' + field]['values']['75.0'])
|
|
||||||
values.append(response.aggregations['extended_stats_' + field]['max'])
|
|
||||||
|
|
||||||
# if not None
|
|
||||||
if (values.count(None) < len(values)):
|
|
||||||
results[field] = values
|
|
||||||
|
|
||||||
df = pd.DataFrame(data=results, index=['count', 'mean', 'std', 'min', '25%', '50%', '75%', 'max'])
|
|
||||||
|
|
||||||
return df
|
|
||||||
|
|
||||||
def info(self, verbose=None, buf=None, max_cols=None, memory_usage=None,
|
def info(self, verbose=None, buf=None, max_cols=None, memory_usage=None,
|
||||||
null_counts=None):
|
null_counts=None):
|
||||||
@ -325,7 +112,7 @@ class DataFrame():
|
|||||||
lines = []
|
lines = []
|
||||||
|
|
||||||
lines.append(str(type(self)))
|
lines.append(str(type(self)))
|
||||||
lines.append(self.index_summary())
|
lines.append(self._index_summary())
|
||||||
|
|
||||||
if len(self.columns) == 0:
|
if len(self.columns) == 0:
|
||||||
lines.append('Empty {name}'.format(name=type(self).__name__))
|
lines.append('Empty {name}'.format(name=type(self).__name__))
|
||||||
@ -435,20 +222,12 @@ class DataFrame():
|
|||||||
|
|
||||||
return num_rows, num_columns
|
return num_rows, num_columns
|
||||||
|
|
||||||
@property
|
|
||||||
def columns(self):
|
|
||||||
return pd.Index(self._mappings.source_fields())
|
|
||||||
|
|
||||||
@property
|
|
||||||
def index(self):
|
|
||||||
return self._index
|
|
||||||
|
|
||||||
def set_index(self, index_field):
|
def set_index(self, index_field):
|
||||||
copy = self.copy()
|
copy = self.copy()
|
||||||
copy._index = ed.Index(index_field)
|
copy._index = Index(index_field)
|
||||||
return copy
|
return copy
|
||||||
|
|
||||||
def index_summary(self):
|
def _index_summary(self):
|
||||||
head = self.head(1).index[0]
|
head = self.head(1).index[0]
|
||||||
tail = self.tail(1).index[0]
|
tail = self.tail(1).index[0]
|
||||||
index_summary = ', %s to %s' % (pprint_thing(head),
|
index_summary = ', %s to %s' % (pprint_thing(head),
|
||||||
@ -457,13 +236,6 @@ class DataFrame():
|
|||||||
name = "Index"
|
name = "Index"
|
||||||
return '%s: %s entries%s' % (name, len(self), index_summary)
|
return '%s: %s entries%s' % (name, len(self), index_summary)
|
||||||
|
|
||||||
@property
|
|
||||||
def dtypes(self):
|
|
||||||
return self._mappings.dtypes()
|
|
||||||
|
|
||||||
def get_dtype_counts(self):
|
|
||||||
return self._mappings.get_dtype_counts()
|
|
||||||
|
|
||||||
def count(self):
|
def count(self):
|
||||||
"""
|
"""
|
||||||
Count non-NA cells for each column (TODO row)
|
Count non-NA cells for each column (TODO row)
|
||||||
@ -487,29 +259,13 @@ class DataFrame():
|
|||||||
|
|
||||||
return count
|
return count
|
||||||
|
|
||||||
def index_count(self):
|
def describe(self):
|
||||||
"""
|
return super()._describe()
|
||||||
Returns
|
|
||||||
-------
|
|
||||||
index_count: int
|
|
||||||
Count of docs where index_field exists
|
|
||||||
"""
|
|
||||||
exists_query = {"query": {"exists": {"field": self._index.index_field}}}
|
|
||||||
|
|
||||||
index_count = self._client.count(index=self._index_pattern, body=exists_query)
|
|
||||||
|
|
||||||
return index_count
|
|
||||||
|
|
||||||
def _filter_by_columns(self, columns):
|
|
||||||
# Return new eland.DataFrame with modified mappings
|
|
||||||
mappings = ed.Mappings(mappings=self._mappings, columns=columns)
|
|
||||||
|
|
||||||
return DataFrame(self._client, self._index_pattern, mappings=mappings)
|
|
||||||
|
|
||||||
def __getitem__(self, key):
|
def __getitem__(self, key):
|
||||||
# NOTE: there is a difference between pandas here.
|
# NOTE: there is a difference between pandas here.
|
||||||
# e.g. df['a'] returns pd.Series, df[['a','b']] return pd.DataFrame
|
# e.g. df['a'] returns pd.Series, df[['a','b']] return pd.DataFrame
|
||||||
# we always return DataFrame - TODO maybe create eland.Series at some point...
|
|
||||||
|
|
||||||
# Implementation mainly copied from pandas v0.24.2
|
# Implementation mainly copied from pandas v0.24.2
|
||||||
# (https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html)
|
# (https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html)
|
||||||
@ -535,26 +291,36 @@ class DataFrame():
|
|||||||
|
|
||||||
# We are left with two options: a single key, and a collection of keys,
|
# We are left with two options: a single key, and a collection of keys,
|
||||||
columns = []
|
columns = []
|
||||||
|
is_single_key = False
|
||||||
if isinstance(key, str):
|
if isinstance(key, str):
|
||||||
if not self._mappings.is_source_field(key):
|
if not self._mappings.is_source_field(key):
|
||||||
raise TypeError('Column does not exist: [{0}]'.format(key))
|
raise TypeError('Column does not exist: [{0}]'.format(key))
|
||||||
columns.append(key)
|
columns.append(key)
|
||||||
|
is_single_key = True
|
||||||
elif isinstance(key, list):
|
elif isinstance(key, list):
|
||||||
columns.extend(key)
|
columns.extend(key)
|
||||||
else:
|
else:
|
||||||
raise TypeError('__getitem__ arguments invalid: [{0}]'.format(key))
|
raise TypeError('__getitem__ arguments invalid: [{0}]'.format(key))
|
||||||
|
|
||||||
return self._filter_by_columns(columns)
|
mappings = self._filter_mappings(columns)
|
||||||
|
|
||||||
def __len__(self):
|
# Return new eland.DataFrame with modified mappings
|
||||||
"""
|
if is_single_key:
|
||||||
Returns length of info axis, but here we use the index.
|
return Series(self._client, self._index_pattern, mappings=mappings)
|
||||||
"""
|
else:
|
||||||
return self._client.count(index=self._index_pattern)
|
return DataFrame(self._client, self._index_pattern, mappings=mappings)
|
||||||
|
|
||||||
|
|
||||||
|
def __getattr__(self, name):
|
||||||
|
# Note: obj.x will always call obj.__getattribute__('x') prior to
|
||||||
|
# calling obj.__getattr__('x').
|
||||||
|
mappings = self._filter_mappings([name])
|
||||||
|
|
||||||
|
return Series(self._client, self._index_pattern, mappings=mappings)
|
||||||
|
|
||||||
def copy(self):
|
def copy(self):
|
||||||
# TODO - test and validate...may need deep copying
|
# TODO - test and validate...may need deep copying
|
||||||
return ed.DataFrame(self._client,
|
return DataFrame(self._client,
|
||||||
self._index_pattern,
|
self._index_pattern,
|
||||||
self._mappings,
|
self._mappings,
|
||||||
self._index)
|
self._index)
|
||||||
@ -590,16 +356,14 @@ class DataFrame():
|
|||||||
if max_rows == None:
|
if max_rows == None:
|
||||||
max_rows = pd.get_option('display.max_rows')
|
max_rows = pd.get_option('display.max_rows')
|
||||||
|
|
||||||
sdf = self.__fake_dataframe__(max_rows=max_rows+1)
|
df = self._fake_head_tail_df(max_rows=max_rows+1)
|
||||||
|
|
||||||
_show_dimensions = show_dimensions
|
|
||||||
|
|
||||||
if buf is not None:
|
if buf is not None:
|
||||||
_buf = _expand_user(_stringify_path(buf))
|
_buf = _expand_user(_stringify_path(buf))
|
||||||
else:
|
else:
|
||||||
_buf = StringIO()
|
_buf = StringIO()
|
||||||
|
|
||||||
sdf.to_string(buf=_buf, columns=columns,
|
df.to_string(buf=_buf, columns=columns,
|
||||||
col_space=col_space, na_rep=na_rep,
|
col_space=col_space, na_rep=na_rep,
|
||||||
formatters=formatters,
|
formatters=formatters,
|
||||||
float_format=float_format,
|
float_format=float_format,
|
||||||
@ -612,51 +376,18 @@ class DataFrame():
|
|||||||
decimal=decimal,
|
decimal=decimal,
|
||||||
line_width=line_width)
|
line_width=line_width)
|
||||||
|
|
||||||
if _show_dimensions:
|
# Our fake dataframe has incorrect number of rows (max_rows*2+1) - write out
|
||||||
|
# the correct number of rows
|
||||||
|
if show_dimensions:
|
||||||
_buf.write("\n\n[{nrows} rows x {ncols} columns]"
|
_buf.write("\n\n[{nrows} rows x {ncols} columns]"
|
||||||
.format(nrows=self.index_count(), ncols=len(self.columns)))
|
.format(nrows=self._index_count(), ncols=len(self.columns)))
|
||||||
|
|
||||||
if buf is None:
|
if buf is None:
|
||||||
result = _buf.getvalue()
|
result = _buf.getvalue()
|
||||||
return result
|
return result
|
||||||
|
|
||||||
|
def to_pandas(selfs):
|
||||||
def __fake_dataframe__(self, max_rows=1):
|
return super()._to_pandas()
|
||||||
head_rows = int(max_rows / 2) + max_rows % 2
|
|
||||||
tail_rows = max_rows - head_rows
|
|
||||||
|
|
||||||
head = self.head(head_rows)
|
|
||||||
tail = self.tail(tail_rows)
|
|
||||||
|
|
||||||
num_rows = len(self)
|
|
||||||
|
|
||||||
if (num_rows > max_rows):
|
|
||||||
# If we have a lot of rows, create a SparseDataFrame and use
|
|
||||||
# pandas to_string logic
|
|
||||||
# NOTE: this sparse DataFrame can't be used as the middle
|
|
||||||
# section is all NaNs. However, it gives us potentially a nice way
|
|
||||||
# to use the pandas IO methods.
|
|
||||||
# TODO - if data is indexed by time series, return top/bottom of
|
|
||||||
# time series, rather than first max_rows items
|
|
||||||
"""
|
|
||||||
if tail_rows > 0:
|
|
||||||
locations = [0, num_rows - tail_rows]
|
|
||||||
lengths = [head_rows, tail_rows]
|
|
||||||
else:
|
|
||||||
locations = [0]
|
|
||||||
lengths = [head_rows]
|
|
||||||
|
|
||||||
sdf = pd.DataFrame({item: pd.SparseArray(data=head[item],
|
|
||||||
sparse_index=
|
|
||||||
BlockIndex(
|
|
||||||
num_rows, locations, lengths))
|
|
||||||
for item in self.columns})
|
|
||||||
"""
|
|
||||||
return pd.concat([head, tail])
|
|
||||||
|
|
||||||
|
|
||||||
return pd.concat([head, tail])
|
|
||||||
|
|
||||||
|
|
||||||
# From pandas.DataFrame
|
# From pandas.DataFrame
|
||||||
def _put_str(s, space):
|
def _put_str(s, space):
|
||||||
|
@ -2,6 +2,7 @@ import warnings
|
|||||||
|
|
||||||
import pandas as pd
|
import pandas as pd
|
||||||
|
|
||||||
|
from pandas.core.dtypes.common import (is_float_dtype, is_bool_dtype, is_integer_dtype, is_datetime_or_timedelta_dtype, is_string_dtype)
|
||||||
|
|
||||||
class Mappings():
|
class Mappings():
|
||||||
"""
|
"""
|
||||||
@ -217,6 +218,7 @@ class Mappings():
|
|||||||
|
|
||||||
return capability_matrix_df.sort_index()
|
return capability_matrix_df.sort_index()
|
||||||
|
|
||||||
|
@staticmethod
|
||||||
def _es_dtype_to_pd_dtype(es_dtype):
|
def _es_dtype_to_pd_dtype(es_dtype):
|
||||||
"""
|
"""
|
||||||
Mapping Elasticsearch types to pandas dtypes
|
Mapping Elasticsearch types to pandas dtypes
|
||||||
@ -259,6 +261,84 @@ class Mappings():
|
|||||||
# Return 'object' for all unsupported TODO - investigate how different types could be supported
|
# Return 'object' for all unsupported TODO - investigate how different types could be supported
|
||||||
return 'object'
|
return 'object'
|
||||||
|
|
||||||
|
@staticmethod
|
||||||
|
def _pd_dtype_to_es_dtype(pd_dtype):
|
||||||
|
"""
|
||||||
|
Mapping pandas dtypes to Elasticsearch dtype
|
||||||
|
--------------------------------------------
|
||||||
|
|
||||||
|
```
|
||||||
|
Pandas dtype Python type NumPy type Usage
|
||||||
|
object str string_, unicode_ Text
|
||||||
|
int64 int int_, int8, int16, int32, int64, uint8, uint16, uint32, uint64 Integer numbers
|
||||||
|
float64 float float_, float16, float32, float64 Floating point numbers
|
||||||
|
bool bool bool_ True/False values
|
||||||
|
datetime64 NA datetime64[ns] Date and time values
|
||||||
|
timedelta[ns] NA NA Differences between two datetimes
|
||||||
|
category NA NA Finite list of text values
|
||||||
|
```
|
||||||
|
"""
|
||||||
|
es_dtype = None
|
||||||
|
|
||||||
|
# Map all to 64-bit - TODO map to specifics: int32 -> int etc.
|
||||||
|
if is_float_dtype(pd_dtype):
|
||||||
|
es_dtype = 'double'
|
||||||
|
elif is_integer_dtype(pd_dtype):
|
||||||
|
es_dtype = 'long'
|
||||||
|
elif is_bool_dtype(pd_dtype):
|
||||||
|
es_dtype = 'boolean'
|
||||||
|
elif is_string_dtype(pd_dtype):
|
||||||
|
es_dtype = 'keyword'
|
||||||
|
elif is_datetime_or_timedelta_dtype(pd_dtype):
|
||||||
|
es_dtype = 'date'
|
||||||
|
else:
|
||||||
|
warnings.warn('No mapping for pd_dtype: [{0}], using default mapping'.format(pd_dtype))
|
||||||
|
|
||||||
|
return es_dtype
|
||||||
|
|
||||||
|
@staticmethod
|
||||||
|
def _generate_es_mappings(dataframe):
|
||||||
|
"""Given a pandas dataframe, generate the associated Elasticsearch mapping
|
||||||
|
|
||||||
|
Parameters
|
||||||
|
----------
|
||||||
|
dataframe : pandas.DataFrame
|
||||||
|
pandas.DataFrame to create schema from
|
||||||
|
|
||||||
|
Returns
|
||||||
|
-------
|
||||||
|
mapping : str
|
||||||
|
"""
|
||||||
|
|
||||||
|
"""
|
||||||
|
"mappings" : {
|
||||||
|
"properties" : {
|
||||||
|
"AvgTicketPrice" : {
|
||||||
|
"type" : "float"
|
||||||
|
},
|
||||||
|
"Cancelled" : {
|
||||||
|
"type" : "boolean"
|
||||||
|
},
|
||||||
|
"Carrier" : {
|
||||||
|
"type" : "keyword"
|
||||||
|
},
|
||||||
|
"Dest" : {
|
||||||
|
"type" : "keyword"
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
"""
|
||||||
|
|
||||||
|
mappings = {}
|
||||||
|
mappings['properties'] = {}
|
||||||
|
for column_name, dtype in dataframe.dtypes.iteritems():
|
||||||
|
es_dtype = Mappings._pd_dtype_to_es_dtype(dtype)
|
||||||
|
|
||||||
|
mappings['properties'][column_name] = {}
|
||||||
|
mappings['properties'][column_name]['type'] = es_dtype
|
||||||
|
|
||||||
|
return {"mappings": mappings}
|
||||||
|
|
||||||
def all_fields(self):
|
def all_fields(self):
|
||||||
"""
|
"""
|
||||||
Returns
|
Returns
|
||||||
@ -379,3 +459,14 @@ class Mappings():
|
|||||||
"""
|
"""
|
||||||
return pd.Series(self._mappings_capabilities[self._mappings_capabilities._source == True].groupby('pd_dtype')[
|
return pd.Series(self._mappings_capabilities[self._mappings_capabilities._source == True].groupby('pd_dtype')[
|
||||||
'_source'].count().to_dict())
|
'_source'].count().to_dict())
|
||||||
|
|
||||||
|
def to_pandas(self):
|
||||||
|
"""
|
||||||
|
|
||||||
|
Returns
|
||||||
|
-------
|
||||||
|
df : pd.DataFrame
|
||||||
|
pandas DaraFrame representing this index
|
||||||
|
"""
|
||||||
|
|
||||||
|
|
||||||
|
371
eland/ndframe.py
Normal file
371
eland/ndframe.py
Normal file
@ -0,0 +1,371 @@
|
|||||||
|
"""
|
||||||
|
NDFrame
|
||||||
|
---------
|
||||||
|
Base class for eland.DataFrame and eland.Series.
|
||||||
|
|
||||||
|
The underlying data resides in Elasticsearch and the API aligns as much as
|
||||||
|
possible with pandas APIs.
|
||||||
|
|
||||||
|
This allows the eland.DataFrame to access large datasets stored in Elasticsearch,
|
||||||
|
without storing the dataset in local memory.
|
||||||
|
|
||||||
|
Implementation Details
|
||||||
|
----------------------
|
||||||
|
|
||||||
|
Elasticsearch indexes can be configured in many different ways, and these indexes
|
||||||
|
utilise different data structures to pandas.
|
||||||
|
|
||||||
|
eland.DataFrame operations that return individual rows (e.g. df.head()) return
|
||||||
|
_source data. If _source is not enabled, this data is not accessible.
|
||||||
|
|
||||||
|
Similarly, only Elasticsearch searchable fields can be searched or filtered, and
|
||||||
|
only Elasticsearch aggregatable fields can be aggregated or grouped.
|
||||||
|
|
||||||
|
"""
|
||||||
|
import pandas as pd
|
||||||
|
import functools
|
||||||
|
from elasticsearch_dsl import Search
|
||||||
|
|
||||||
|
import eland as ed
|
||||||
|
|
||||||
|
from pandas.core.generic import NDFrame as pd_NDFrame
|
||||||
|
from pandas._libs import Timestamp, iNaT, properties
|
||||||
|
|
||||||
|
|
||||||
|
class NDFrame():
|
||||||
|
"""
|
||||||
|
pandas.DataFrame/Series like API that proxies into Elasticsearch index(es).
|
||||||
|
|
||||||
|
Parameters
|
||||||
|
----------
|
||||||
|
client : eland.Client
|
||||||
|
A reference to a Elasticsearch python client
|
||||||
|
|
||||||
|
index_pattern : str
|
||||||
|
An Elasticsearch index pattern. This can contain wildcards (e.g. filebeat-*).
|
||||||
|
|
||||||
|
See Also
|
||||||
|
--------
|
||||||
|
|
||||||
|
"""
|
||||||
|
def __init__(self,
|
||||||
|
client,
|
||||||
|
index_pattern,
|
||||||
|
mappings=None,
|
||||||
|
index_field=None):
|
||||||
|
|
||||||
|
self._client = ed.Client(client)
|
||||||
|
self._index_pattern = index_pattern
|
||||||
|
|
||||||
|
# Get and persist mappings, this allows us to correctly
|
||||||
|
# map returned types from Elasticsearch to pandas datatypes
|
||||||
|
if mappings is None:
|
||||||
|
self._mappings = ed.Mappings(self._client, self._index_pattern)
|
||||||
|
else:
|
||||||
|
self._mappings = mappings
|
||||||
|
|
||||||
|
self._index = ed.Index(index_field)
|
||||||
|
|
||||||
|
def _es_results_to_pandas(self, results):
|
||||||
|
"""
|
||||||
|
Parameters
|
||||||
|
----------
|
||||||
|
results: dict
|
||||||
|
Elasticsearch results from self.client.search
|
||||||
|
|
||||||
|
Returns
|
||||||
|
-------
|
||||||
|
df: pandas.DataFrame
|
||||||
|
_source values extracted from results and mapped to pandas DataFrame
|
||||||
|
dtypes are mapped via Mapping object
|
||||||
|
|
||||||
|
Notes
|
||||||
|
-----
|
||||||
|
Fields containing lists in Elasticsearch don't map easily to pandas.DataFrame
|
||||||
|
For example, an index with mapping:
|
||||||
|
```
|
||||||
|
"mappings" : {
|
||||||
|
"properties" : {
|
||||||
|
"group" : {
|
||||||
|
"type" : "keyword"
|
||||||
|
},
|
||||||
|
"user" : {
|
||||||
|
"type" : "nested",
|
||||||
|
"properties" : {
|
||||||
|
"first" : {
|
||||||
|
"type" : "keyword"
|
||||||
|
},
|
||||||
|
"last" : {
|
||||||
|
"type" : "keyword"
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
```
|
||||||
|
Adding a document:
|
||||||
|
```
|
||||||
|
"_source" : {
|
||||||
|
"group" : "amsterdam",
|
||||||
|
"user" : [
|
||||||
|
{
|
||||||
|
"first" : "John",
|
||||||
|
"last" : "Smith"
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"first" : "Alice",
|
||||||
|
"last" : "White"
|
||||||
|
}
|
||||||
|
]
|
||||||
|
}
|
||||||
|
```
|
||||||
|
(https://www.elastic.co/guide/en/elasticsearch/reference/current/nested.html)
|
||||||
|
this would be transformed internally (in Elasticsearch) into a document that looks more like this:
|
||||||
|
```
|
||||||
|
{
|
||||||
|
"group" : "amsterdam",
|
||||||
|
"user.first" : [ "alice", "john" ],
|
||||||
|
"user.last" : [ "smith", "white" ]
|
||||||
|
}
|
||||||
|
```
|
||||||
|
When mapping this a pandas data frame we mimic this transformation.
|
||||||
|
|
||||||
|
Similarly, if a list is added to Elasticsearch:
|
||||||
|
```
|
||||||
|
PUT my_index/_doc/1
|
||||||
|
{
|
||||||
|
"list" : [
|
||||||
|
0, 1, 2
|
||||||
|
]
|
||||||
|
}
|
||||||
|
```
|
||||||
|
The mapping is:
|
||||||
|
```
|
||||||
|
"mappings" : {
|
||||||
|
"properties" : {
|
||||||
|
"user" : {
|
||||||
|
"type" : "long"
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
```
|
||||||
|
TODO - explain how lists are handled (https://www.elastic.co/guide/en/elasticsearch/reference/current/array.html)
|
||||||
|
TODO - an option here is to use Elasticsearch's multi-field matching instead of pandas treatment of lists (which isn't great)
|
||||||
|
NOTE - using this lists is generally not a good way to use this API
|
||||||
|
"""
|
||||||
|
def flatten_dict(y):
|
||||||
|
out = {}
|
||||||
|
|
||||||
|
def flatten(x, name=''):
|
||||||
|
# We flatten into source fields e.g. if type=geo_point
|
||||||
|
# location: {lat=52.38, lon=4.90}
|
||||||
|
if name == '':
|
||||||
|
is_source_field = False
|
||||||
|
pd_dtype = 'object'
|
||||||
|
else:
|
||||||
|
is_source_field, pd_dtype = self._mappings.source_field_pd_dtype(name[:-1])
|
||||||
|
|
||||||
|
if not is_source_field and type(x) is dict:
|
||||||
|
for a in x:
|
||||||
|
flatten(x[a], name + a + '.')
|
||||||
|
elif not is_source_field and type(x) is list:
|
||||||
|
for a in x:
|
||||||
|
flatten(a, name)
|
||||||
|
elif is_source_field == True: # only print source fields from mappings (TODO - not so efficient for large number of fields and filtered mapping)
|
||||||
|
field_name = name[:-1]
|
||||||
|
|
||||||
|
# Coerce types - for now just datetime
|
||||||
|
if pd_dtype == 'datetime64[ns]':
|
||||||
|
x = pd.to_datetime(x)
|
||||||
|
|
||||||
|
# Elasticsearch can have multiple values for a field. These are represented as lists, so
|
||||||
|
# create lists for this pivot (see notes above)
|
||||||
|
if field_name in out:
|
||||||
|
if type(out[field_name]) is not list:
|
||||||
|
l = [out[field_name]]
|
||||||
|
out[field_name] = l
|
||||||
|
out[field_name].append(x)
|
||||||
|
else:
|
||||||
|
out[field_name] = x
|
||||||
|
|
||||||
|
flatten(y)
|
||||||
|
|
||||||
|
return out
|
||||||
|
|
||||||
|
rows = []
|
||||||
|
index = []
|
||||||
|
if isinstance(results, dict):
|
||||||
|
iterator = results['hits']['hits']
|
||||||
|
else:
|
||||||
|
iterator = results
|
||||||
|
|
||||||
|
for hit in iterator:
|
||||||
|
row = hit['_source']
|
||||||
|
|
||||||
|
# get index value - can be _id or can be field value in source
|
||||||
|
if self._index.is_source_field:
|
||||||
|
index_field = row[self._index.index_field]
|
||||||
|
else:
|
||||||
|
index_field = hit[self._index.index_field]
|
||||||
|
index.append(index_field)
|
||||||
|
|
||||||
|
# flatten row to map correctly to 2D DataFrame
|
||||||
|
rows.append(flatten_dict(row))
|
||||||
|
|
||||||
|
# Create pandas DataFrame
|
||||||
|
df = pd.DataFrame(data=rows, index=index)
|
||||||
|
|
||||||
|
# _source may not contain all columns in the mapping
|
||||||
|
# therefore, fill in missing columns
|
||||||
|
# (note this returns self.columns NOT IN df.columns)
|
||||||
|
missing_columns = list(set(self._columns) - set(df.columns))
|
||||||
|
|
||||||
|
for missing in missing_columns:
|
||||||
|
is_source_field, pd_dtype = self._mappings.source_field_pd_dtype(missing)
|
||||||
|
df[missing] = None
|
||||||
|
df[missing].astype(pd_dtype)
|
||||||
|
|
||||||
|
# Sort columns in mapping order
|
||||||
|
df = df[self._columns]
|
||||||
|
|
||||||
|
return df
|
||||||
|
|
||||||
|
def _head(self, n=5):
|
||||||
|
"""
|
||||||
|
Protected method that returns head as pandas.DataFrame.
|
||||||
|
|
||||||
|
Returns
|
||||||
|
-------
|
||||||
|
_head
|
||||||
|
pandas.DataFrame of top N values
|
||||||
|
"""
|
||||||
|
sort_params = self._index.sort_field + ":asc"
|
||||||
|
|
||||||
|
results = self._client.search(index=self._index_pattern, size=n, sort=sort_params)
|
||||||
|
|
||||||
|
return self._es_results_to_pandas(results)
|
||||||
|
|
||||||
|
def _tail(self, n=5):
|
||||||
|
"""
|
||||||
|
Protected method that returns tail as pandas.DataFrame.
|
||||||
|
|
||||||
|
Returns
|
||||||
|
-------
|
||||||
|
_tail
|
||||||
|
pandas.DataFrame of last N values
|
||||||
|
"""
|
||||||
|
sort_params = self._index.sort_field + ":desc"
|
||||||
|
|
||||||
|
results = self._client.search(index=self._index_pattern, size=n, sort=sort_params)
|
||||||
|
|
||||||
|
df = self._es_results_to_pandas(results)
|
||||||
|
|
||||||
|
# reverse order (index ascending)
|
||||||
|
return df.sort_index()
|
||||||
|
|
||||||
|
def _to_pandas(self):
|
||||||
|
"""
|
||||||
|
Protected method that returns all data as pandas.DataFrame.
|
||||||
|
|
||||||
|
Returns
|
||||||
|
-------
|
||||||
|
df
|
||||||
|
pandas.DataFrame of all values
|
||||||
|
"""
|
||||||
|
sort_params = self._index.sort_field + ":asc"
|
||||||
|
|
||||||
|
results = self._client.scan(index=self._index_pattern)
|
||||||
|
|
||||||
|
# We sort here rather than in scan - once everything is in core this
|
||||||
|
# should be faster
|
||||||
|
return self._es_results_to_pandas(results)
|
||||||
|
|
||||||
|
def _describe(self):
|
||||||
|
numeric_source_fields = self._mappings.numeric_source_fields()
|
||||||
|
|
||||||
|
# for each field we compute:
|
||||||
|
# count, mean, std, min, 25%, 50%, 75%, max
|
||||||
|
search = Search(using=self._client, index=self._index_pattern).extra(size=0)
|
||||||
|
|
||||||
|
for field in numeric_source_fields:
|
||||||
|
search.aggs.metric('extended_stats_' + field, 'extended_stats', field=field)
|
||||||
|
search.aggs.metric('percentiles_' + field, 'percentiles', field=field)
|
||||||
|
|
||||||
|
response = search.execute()
|
||||||
|
|
||||||
|
results = {}
|
||||||
|
|
||||||
|
for field in numeric_source_fields:
|
||||||
|
values = list()
|
||||||
|
values.append(response.aggregations['extended_stats_' + field]['count'])
|
||||||
|
values.append(response.aggregations['extended_stats_' + field]['avg'])
|
||||||
|
values.append(response.aggregations['extended_stats_' + field]['std_deviation'])
|
||||||
|
values.append(response.aggregations['extended_stats_' + field]['min'])
|
||||||
|
values.append(response.aggregations['percentiles_' + field]['values']['25.0'])
|
||||||
|
values.append(response.aggregations['percentiles_' + field]['values']['50.0'])
|
||||||
|
values.append(response.aggregations['percentiles_' + field]['values']['75.0'])
|
||||||
|
values.append(response.aggregations['extended_stats_' + field]['max'])
|
||||||
|
|
||||||
|
# if not None
|
||||||
|
if values.count(None) < len(values):
|
||||||
|
results[field] = values
|
||||||
|
|
||||||
|
df = pd.DataFrame(data=results, index=['count', 'mean', 'std', 'min', '25%', '50%', '75%', 'max'])
|
||||||
|
|
||||||
|
return df
|
||||||
|
|
||||||
|
def _filter_mappings(self, columns):
|
||||||
|
mappings = ed.Mappings(mappings=self._mappings, columns=columns)
|
||||||
|
|
||||||
|
return mappings
|
||||||
|
|
||||||
|
@property
|
||||||
|
def columns(self):
|
||||||
|
return self._columns
|
||||||
|
|
||||||
|
@property
|
||||||
|
def index(self):
|
||||||
|
return self._index
|
||||||
|
|
||||||
|
@property
|
||||||
|
def dtypes(self):
|
||||||
|
return self._mappings.dtypes()
|
||||||
|
|
||||||
|
@property
|
||||||
|
def _columns(self):
|
||||||
|
return pd.Index(self._mappings.source_fields())
|
||||||
|
|
||||||
|
def get_dtype_counts(self):
|
||||||
|
return self._mappings.get_dtype_counts()
|
||||||
|
|
||||||
|
def _index_count(self):
|
||||||
|
"""
|
||||||
|
Returns
|
||||||
|
-------
|
||||||
|
index_count: int
|
||||||
|
Count of docs where index_field exists
|
||||||
|
"""
|
||||||
|
exists_query = {"query": {"exists": {"field": self._index.index_field}}}
|
||||||
|
|
||||||
|
index_count = self._client.count(index=self._index_pattern, body=exists_query)
|
||||||
|
|
||||||
|
return index_count
|
||||||
|
|
||||||
|
def __len__(self):
|
||||||
|
"""
|
||||||
|
Returns length of info axis, but here we use the index.
|
||||||
|
"""
|
||||||
|
return self._client.count(index=self._index_pattern)
|
||||||
|
|
||||||
|
def _fake_head_tail_df(self, max_rows=1):
|
||||||
|
"""
|
||||||
|
Create a 'fake' pd.DataFrame of the entire ed.DataFrame
|
||||||
|
by concat head and tail. Used for display.
|
||||||
|
"""
|
||||||
|
head_rows = int(max_rows / 2) + max_rows % 2
|
||||||
|
tail_rows = max_rows - head_rows
|
||||||
|
|
||||||
|
head = self._head(head_rows)
|
||||||
|
tail = self._tail(tail_rows)
|
||||||
|
|
||||||
|
return head.append(tail)
|
402
eland/series.py
Normal file
402
eland/series.py
Normal file
@ -0,0 +1,402 @@
|
|||||||
|
"""
|
||||||
|
Series
|
||||||
|
---------
|
||||||
|
One-dimensional ndarray with axis labels (including time series).
|
||||||
|
|
||||||
|
The underlying data resides in Elasticsearch and the API aligns as much as
|
||||||
|
possible with pandas.DataFrame API.
|
||||||
|
|
||||||
|
This allows the eland.Series to access large datasets stored in Elasticsearch,
|
||||||
|
without storing the dataset in local memory.
|
||||||
|
|
||||||
|
Implementation Details
|
||||||
|
----------------------
|
||||||
|
Based on NDFrame which underpins eland.1DataFrame
|
||||||
|
|
||||||
|
"""
|
||||||
|
import sys
|
||||||
|
|
||||||
|
import pandas as pd
|
||||||
|
import pandas.compat as compat
|
||||||
|
from pandas.compat import StringIO
|
||||||
|
from pandas.core.dtypes.common import (
|
||||||
|
is_categorical_dtype)
|
||||||
|
from pandas.io.formats import format as fmt
|
||||||
|
from pandas.io.formats.printing import pprint_thing
|
||||||
|
|
||||||
|
from eland import Index
|
||||||
|
from eland import NDFrame
|
||||||
|
|
||||||
|
|
||||||
|
class Series(NDFrame):
|
||||||
|
"""
|
||||||
|
pandas.Series like API that proxies into Elasticsearch index(es).
|
||||||
|
|
||||||
|
Parameters
|
||||||
|
----------
|
||||||
|
client : eland.Client
|
||||||
|
A reference to a Elasticsearch python client
|
||||||
|
|
||||||
|
index_pattern : str
|
||||||
|
An Elasticsearch index pattern. This can contain wildcards (e.g. filebeat-*).
|
||||||
|
|
||||||
|
field_name : str
|
||||||
|
The field to base the series on
|
||||||
|
|
||||||
|
See Also
|
||||||
|
--------
|
||||||
|
|
||||||
|
Examples
|
||||||
|
--------
|
||||||
|
|
||||||
|
import eland as ed
|
||||||
|
client = ed.Client(Elasticsearch())
|
||||||
|
s = ed.DataFrame(client, 'reviews', 'date')
|
||||||
|
df.head()
|
||||||
|
reviewerId vendorId rating date
|
||||||
|
0 0 0 5 2006-04-07 17:08
|
||||||
|
1 1 1 5 2006-05-04 12:16
|
||||||
|
2 2 2 4 2006-04-21 12:26
|
||||||
|
3 3 3 5 2006-04-18 15:48
|
||||||
|
4 3 4 5 2006-04-18 15:49
|
||||||
|
|
||||||
|
Notice that the types are based on Elasticsearch mappings
|
||||||
|
|
||||||
|
Notes
|
||||||
|
-----
|
||||||
|
If the Elasticsearch index is deleted or index mappings are changed after this
|
||||||
|
object is created, the object is not rebuilt and so inconsistencies can occur.
|
||||||
|
|
||||||
|
"""
|
||||||
|
|
||||||
|
def __init__(self,
|
||||||
|
client,
|
||||||
|
index_pattern,
|
||||||
|
field_name=None,
|
||||||
|
mappings=None,
|
||||||
|
index_field=None):
|
||||||
|
# python 3 syntax
|
||||||
|
super().__init__(client, index_pattern, mappings=mappings, index_field=index_field)
|
||||||
|
|
||||||
|
# now select column (field_name)
|
||||||
|
if field_name is not None:
|
||||||
|
self._mappings = self._filter_mappings([field_name])
|
||||||
|
elif len(self._mappings.source_fields()) != 1:
|
||||||
|
raise TypeError('Series must have 1 field: [{0}]'.format(len(self._mappings.source_fields())))
|
||||||
|
|
||||||
|
def head(self, n=5):
|
||||||
|
return self._df_to_series(super()._head(n))
|
||||||
|
|
||||||
|
def tail(self, n=5):
|
||||||
|
return self._df_to_series(super()._tail(n))
|
||||||
|
|
||||||
|
def info(self, verbose=None, buf=None, max_cols=None, memory_usage=None,
|
||||||
|
null_counts=None):
|
||||||
|
"""
|
||||||
|
Print a concise summary of a DataFrame.
|
||||||
|
|
||||||
|
This method prints information about a DataFrame including
|
||||||
|
the index dtype and column dtypes, non-null values and memory usage.
|
||||||
|
|
||||||
|
This copies a lot of code from pandas.DataFrame.info as it is difficult
|
||||||
|
to split out the appropriate code or creating a SparseDataFrame gives
|
||||||
|
incorrect results on types and counts.
|
||||||
|
"""
|
||||||
|
if buf is None: # pragma: no cover
|
||||||
|
buf = sys.stdout
|
||||||
|
|
||||||
|
lines = []
|
||||||
|
|
||||||
|
lines.append(str(type(self)))
|
||||||
|
lines.append(self._index_summary())
|
||||||
|
|
||||||
|
if len(self.columns) == 0:
|
||||||
|
lines.append('Empty {name}'.format(name=type(self).__name__))
|
||||||
|
fmt.buffer_put_lines(buf, lines)
|
||||||
|
return
|
||||||
|
|
||||||
|
cols = self.columns
|
||||||
|
|
||||||
|
# hack
|
||||||
|
if max_cols is None:
|
||||||
|
max_cols = pd.get_option('display.max_info_columns',
|
||||||
|
len(self.columns) + 1)
|
||||||
|
|
||||||
|
max_rows = pd.get_option('display.max_info_rows', len(self) + 1)
|
||||||
|
|
||||||
|
if null_counts is None:
|
||||||
|
show_counts = ((len(self.columns) <= max_cols) and
|
||||||
|
(len(self) < max_rows))
|
||||||
|
else:
|
||||||
|
show_counts = null_counts
|
||||||
|
exceeds_info_cols = len(self.columns) > max_cols
|
||||||
|
|
||||||
|
def _verbose_repr():
|
||||||
|
lines.append('Data columns (total %d columns):' %
|
||||||
|
len(self.columns))
|
||||||
|
space = max(len(pprint_thing(k)) for k in self.columns) + 4
|
||||||
|
counts = None
|
||||||
|
|
||||||
|
tmpl = "{count}{dtype}"
|
||||||
|
if show_counts:
|
||||||
|
counts = self.count()
|
||||||
|
if len(cols) != len(counts): # pragma: no cover
|
||||||
|
raise AssertionError(
|
||||||
|
'Columns must equal counts '
|
||||||
|
'({cols:d} != {counts:d})'.format(
|
||||||
|
cols=len(cols), counts=len(counts)))
|
||||||
|
tmpl = "{count} non-null {dtype}"
|
||||||
|
|
||||||
|
dtypes = self.dtypes
|
||||||
|
for i, col in enumerate(self._columns):
|
||||||
|
dtype = dtypes.iloc[i]
|
||||||
|
col = pprint_thing(col)
|
||||||
|
|
||||||
|
count = ""
|
||||||
|
if show_counts:
|
||||||
|
count = counts.iloc[i]
|
||||||
|
|
||||||
|
lines.append(_put_str(col, space) + tmpl.format(count=count,
|
||||||
|
dtype=dtype))
|
||||||
|
|
||||||
|
def _non_verbose_repr():
|
||||||
|
lines.append(self._columns._summary(name='Columns'))
|
||||||
|
|
||||||
|
def _sizeof_fmt(num, size_qualifier):
|
||||||
|
# returns size in human readable format
|
||||||
|
for x in ['bytes', 'KB', 'MB', 'GB', 'TB']:
|
||||||
|
if num < 1024.0:
|
||||||
|
return ("{num:3.1f}{size_q} "
|
||||||
|
"{x}".format(num=num, size_q=size_qualifier, x=x))
|
||||||
|
num /= 1024.0
|
||||||
|
return "{num:3.1f}{size_q} {pb}".format(num=num,
|
||||||
|
size_q=size_qualifier,
|
||||||
|
pb='PB')
|
||||||
|
|
||||||
|
if verbose:
|
||||||
|
_verbose_repr()
|
||||||
|
elif verbose is False: # specifically set to False, not nesc None
|
||||||
|
_non_verbose_repr()
|
||||||
|
else:
|
||||||
|
if exceeds_info_cols:
|
||||||
|
_non_verbose_repr()
|
||||||
|
else:
|
||||||
|
_verbose_repr()
|
||||||
|
|
||||||
|
counts = self.get_dtype_counts()
|
||||||
|
dtypes = ['{k}({kk:d})'.format(k=k[0], kk=k[1]) for k
|
||||||
|
in sorted(counts.items())]
|
||||||
|
lines.append('dtypes: {types}'.format(types=', '.join(dtypes)))
|
||||||
|
|
||||||
|
if memory_usage is None:
|
||||||
|
memory_usage = pd.get_option('display.memory_usage')
|
||||||
|
if memory_usage:
|
||||||
|
# append memory usage of df to display
|
||||||
|
size_qualifier = ''
|
||||||
|
|
||||||
|
# TODO - this is different from pd.DataFrame as we shouldn't
|
||||||
|
# really hold much in memory. For now just approximate with getsizeof + ignore deep
|
||||||
|
mem_usage = sys.getsizeof(self)
|
||||||
|
lines.append("memory usage: {mem}\n".format(
|
||||||
|
mem=_sizeof_fmt(mem_usage, size_qualifier)))
|
||||||
|
|
||||||
|
fmt.buffer_put_lines(buf, lines)
|
||||||
|
|
||||||
|
@property
|
||||||
|
def name(self):
|
||||||
|
return list(self._mappings.source_fields())[0]
|
||||||
|
|
||||||
|
@property
|
||||||
|
def shape(self):
|
||||||
|
"""
|
||||||
|
Return a tuple representing the dimensionality of the DataFrame.
|
||||||
|
|
||||||
|
Returns
|
||||||
|
-------
|
||||||
|
shape: tuple
|
||||||
|
0 - number of rows
|
||||||
|
1 - number of columns
|
||||||
|
"""
|
||||||
|
num_rows = len(self)
|
||||||
|
num_columns = len(self._columns)
|
||||||
|
|
||||||
|
return num_rows, num_columns
|
||||||
|
|
||||||
|
@property
|
||||||
|
def set_index(self, index_field):
|
||||||
|
copy = self.copy()
|
||||||
|
copy._index = Index(index_field)
|
||||||
|
return copy
|
||||||
|
|
||||||
|
def _index_summary(self):
|
||||||
|
head = self.head(1).index[0]
|
||||||
|
tail = self.tail(1).index[0]
|
||||||
|
index_summary = ', %s to %s' % (pprint_thing(head),
|
||||||
|
pprint_thing(tail))
|
||||||
|
|
||||||
|
name = "Index"
|
||||||
|
return '%s: %s entries%s' % (name, len(self), index_summary)
|
||||||
|
|
||||||
|
def count(self):
|
||||||
|
"""
|
||||||
|
Count non-NA cells for each column (TODO row)
|
||||||
|
|
||||||
|
Counts are based on exists queries against ES
|
||||||
|
|
||||||
|
This is inefficient, as it creates N queries (N is number of fields).
|
||||||
|
|
||||||
|
An alternative approach is to use value_count aggregations. However, they have issues in that:
|
||||||
|
1. They can only be used with aggregatable fields (e.g. keyword not text)
|
||||||
|
2. For list fields they return multiple counts. E.g. tags=['elastic', 'ml'] returns value_count=2
|
||||||
|
for a single document.
|
||||||
|
"""
|
||||||
|
counts = {}
|
||||||
|
for field in self._mappings.source_fields():
|
||||||
|
exists_query = {"query": {"exists": {"field": field}}}
|
||||||
|
field_exists_count = self._client.count(index=self._index_pattern, body=exists_query)
|
||||||
|
counts[field] = field_exists_count
|
||||||
|
|
||||||
|
count = pd.Series(data=counts, index=self._mappings.source_fields())
|
||||||
|
|
||||||
|
return count
|
||||||
|
|
||||||
|
def describe(self):
|
||||||
|
return super()._describe()
|
||||||
|
|
||||||
|
def _df_to_series(self, df):
|
||||||
|
return df[self.name]
|
||||||
|
|
||||||
|
# ----------------------------------------------------------------------
|
||||||
|
# Rendering Methods
|
||||||
|
def __repr__(self):
|
||||||
|
"""
|
||||||
|
From pandas
|
||||||
|
"""
|
||||||
|
buf = StringIO()
|
||||||
|
|
||||||
|
max_rows = pd.get_option("display.max_rows")
|
||||||
|
|
||||||
|
self.to_string(buf=buf, na_rep='NaN', float_format=None, header=True, index=True, length=True,
|
||||||
|
dtype=True, name=True, max_rows=max_rows)
|
||||||
|
|
||||||
|
return buf.getvalue()
|
||||||
|
|
||||||
|
def to_string(self, buf=None, na_rep='NaN',
|
||||||
|
float_format=None, header=True,
|
||||||
|
index=True, length=True, dtype=True,
|
||||||
|
name=True, max_rows=None):
|
||||||
|
"""
|
||||||
|
From pandas 0.24.2
|
||||||
|
|
||||||
|
Render a string representation of the Series.
|
||||||
|
|
||||||
|
Parameters
|
||||||
|
----------
|
||||||
|
buf : StringIO-like, optional
|
||||||
|
buffer to write to
|
||||||
|
na_rep : string, optional
|
||||||
|
string representation of NAN to use, default 'NaN'
|
||||||
|
float_format : one-parameter function, optional
|
||||||
|
formatter function to apply to columns' elements if they are floats
|
||||||
|
default None
|
||||||
|
header : boolean, default True
|
||||||
|
Add the Series header (index name)
|
||||||
|
index : bool, optional
|
||||||
|
Add index (row) labels, default True
|
||||||
|
length : boolean, default False
|
||||||
|
Add the Series length
|
||||||
|
dtype : boolean, default False
|
||||||
|
Add the Series dtype
|
||||||
|
name : boolean, default False
|
||||||
|
Add the Series name if not None
|
||||||
|
max_rows : int, optional
|
||||||
|
Maximum number of rows to show before truncating. If None, show
|
||||||
|
all.
|
||||||
|
|
||||||
|
Returns
|
||||||
|
-------
|
||||||
|
formatted : string (if not buffer passed)
|
||||||
|
"""
|
||||||
|
if max_rows == None:
|
||||||
|
max_rows = pd.get_option("display.max_rows")
|
||||||
|
|
||||||
|
df = self._fake_head_tail_df(max_rows=max_rows + 1)
|
||||||
|
|
||||||
|
s = self._df_to_series(df)
|
||||||
|
|
||||||
|
formatter = Series.SeriesFormatter(s, len(self), name=name, length=length,
|
||||||
|
header=header, index=index,
|
||||||
|
dtype=dtype, na_rep=na_rep,
|
||||||
|
float_format=float_format,
|
||||||
|
max_rows=max_rows)
|
||||||
|
result = formatter.to_string()
|
||||||
|
|
||||||
|
# catch contract violations
|
||||||
|
if not isinstance(result, compat.text_type):
|
||||||
|
raise AssertionError("result must be of type unicode, type"
|
||||||
|
" of result is {0!r}"
|
||||||
|
"".format(result.__class__.__name__))
|
||||||
|
|
||||||
|
if buf is None:
|
||||||
|
return result
|
||||||
|
else:
|
||||||
|
try:
|
||||||
|
buf.write(result)
|
||||||
|
except AttributeError:
|
||||||
|
with open(buf, 'w') as f:
|
||||||
|
f.write(result)
|
||||||
|
|
||||||
|
class SeriesFormatter(fmt.SeriesFormatter):
|
||||||
|
"""
|
||||||
|
A hacked overridden version of pandas.io.formats.SeriesFormatter that writes correct length
|
||||||
|
"""
|
||||||
|
def __init__(self, series, series_length, buf=None, length=True, header=True, index=True,
|
||||||
|
na_rep='NaN', name=False, float_format=None, dtype=True,
|
||||||
|
max_rows=None):
|
||||||
|
super().__init__(series, buf=buf, length=length, header=header, index=index,
|
||||||
|
na_rep=na_rep, name=name, float_format=float_format, dtype=dtype,
|
||||||
|
max_rows=max_rows)
|
||||||
|
self._series_length = series_length
|
||||||
|
|
||||||
|
def _get_footer(self):
|
||||||
|
"""
|
||||||
|
Overridden with length change
|
||||||
|
(from pandas 0.24.2 io.formats.SeriesFormatter)
|
||||||
|
"""
|
||||||
|
name = self.series.name
|
||||||
|
footer = ''
|
||||||
|
|
||||||
|
if getattr(self.series.index, 'freq', None) is not None:
|
||||||
|
footer += 'Freq: {freq}'.format(freq=self.series.index.freqstr)
|
||||||
|
|
||||||
|
if self.name is not False and name is not None:
|
||||||
|
if footer:
|
||||||
|
footer += ', '
|
||||||
|
|
||||||
|
series_name = pprint_thing(name,
|
||||||
|
escape_chars=('\t', '\r', '\n'))
|
||||||
|
footer += ("Name: {sname}".format(sname=series_name)
|
||||||
|
if name is not None else "")
|
||||||
|
|
||||||
|
if (self.length is True or
|
||||||
|
(self.length == 'truncate' and self.truncate_v)):
|
||||||
|
if footer:
|
||||||
|
footer += ', '
|
||||||
|
footer += 'Length: {length}'.format(length=self._series_length)
|
||||||
|
|
||||||
|
if self.dtype is not False and self.dtype is not None:
|
||||||
|
name = getattr(self.tr_series.dtype, 'name', None)
|
||||||
|
if name:
|
||||||
|
if footer:
|
||||||
|
footer += ', '
|
||||||
|
footer += 'dtype: {typ}'.format(typ=pprint_thing(name))
|
||||||
|
|
||||||
|
# level infos are added to the end and in a new line, like it is done
|
||||||
|
# for Categoricals
|
||||||
|
if is_categorical_dtype(self.tr_series.dtype):
|
||||||
|
level_info = self.tr_series._values._repr_categories_info()
|
||||||
|
if footer:
|
||||||
|
footer += "\n"
|
||||||
|
footer += level_info
|
||||||
|
|
||||||
|
return compat.text_type(footer)
|
@ -1,5 +1,6 @@
|
|||||||
# File called _pytest for PyCharm compatability
|
# File called _pytest for PyCharm compatability
|
||||||
|
|
||||||
|
import numpy as np
|
||||||
from pandas.util.testing import (
|
from pandas.util.testing import (
|
||||||
assert_series_equal, assert_frame_equal)
|
assert_series_equal, assert_frame_equal)
|
||||||
|
|
||||||
@ -16,7 +17,7 @@ class TestMapping(TestData):
|
|||||||
|
|
||||||
assert TEST_MAPPING1_EXPECTED_DF.index.tolist() == mappings.all_fields()
|
assert TEST_MAPPING1_EXPECTED_DF.index.tolist() == mappings.all_fields()
|
||||||
|
|
||||||
assert_frame_equal(TEST_MAPPING1_EXPECTED_DF, pd.DataFrame(mappings.mappings_capabilities['es_dtype']))
|
assert_frame_equal(TEST_MAPPING1_EXPECTED_DF, pd.DataFrame(mappings._mappings_capabilities['es_dtype']))
|
||||||
|
|
||||||
assert TEST_MAPPING1_EXPECTED_SOURCE_FIELD_COUNT == mappings.count_source_fields()
|
assert TEST_MAPPING1_EXPECTED_SOURCE_FIELD_COUNT == mappings.count_source_fields()
|
||||||
|
|
||||||
@ -24,7 +25,7 @@ class TestMapping(TestData):
|
|||||||
mappings = ed.Mappings(ed.Client(ELASTICSEARCH_HOST), TEST_MAPPING1_INDEX_NAME)
|
mappings = ed.Mappings(ed.Client(ELASTICSEARCH_HOST), TEST_MAPPING1_INDEX_NAME)
|
||||||
|
|
||||||
assert TEST_MAPPING1_EXPECTED_DF.index.tolist() == mappings.all_fields()
|
assert TEST_MAPPING1_EXPECTED_DF.index.tolist() == mappings.all_fields()
|
||||||
assert_frame_equal(TEST_MAPPING1_EXPECTED_DF, pd.DataFrame(mappings.mappings_capabilities['es_dtype']))
|
assert_frame_equal(TEST_MAPPING1_EXPECTED_DF, pd.DataFrame(mappings._mappings_capabilities['es_dtype']))
|
||||||
assert TEST_MAPPING1_EXPECTED_SOURCE_FIELD_COUNT == mappings.count_source_fields()
|
assert TEST_MAPPING1_EXPECTED_SOURCE_FIELD_COUNT == mappings.count_source_fields()
|
||||||
|
|
||||||
# Pick 1 source field
|
# Pick 1 source field
|
||||||
@ -43,7 +44,7 @@ class TestMapping(TestData):
|
|||||||
|
|
||||||
# Check original is still ok
|
# Check original is still ok
|
||||||
assert TEST_MAPPING1_EXPECTED_DF.index.tolist() == mappings.all_fields()
|
assert TEST_MAPPING1_EXPECTED_DF.index.tolist() == mappings.all_fields()
|
||||||
assert_frame_equal(TEST_MAPPING1_EXPECTED_DF, pd.DataFrame(mappings.mappings_capabilities['es_dtype']))
|
assert_frame_equal(TEST_MAPPING1_EXPECTED_DF, pd.DataFrame(mappings._mappings_capabilities['es_dtype']))
|
||||||
assert TEST_MAPPING1_EXPECTED_SOURCE_FIELD_COUNT == mappings.count_source_fields()
|
assert TEST_MAPPING1_EXPECTED_SOURCE_FIELD_COUNT == mappings.count_source_fields()
|
||||||
|
|
||||||
def test_dtypes(self):
|
def test_dtypes(self):
|
||||||
@ -88,3 +89,36 @@ class TestMapping(TestData):
|
|||||||
assert 'object' == field_capabilities['pd_dtype']
|
assert 'object' == field_capabilities['pd_dtype']
|
||||||
assert True == field_capabilities['searchable']
|
assert True == field_capabilities['searchable']
|
||||||
assert True == field_capabilities['aggregatable']
|
assert True == field_capabilities['aggregatable']
|
||||||
|
|
||||||
|
def test_generate_es_mappings(self):
|
||||||
|
df = pd.DataFrame(data={'A': np.random.rand(3),
|
||||||
|
'B': 1,
|
||||||
|
'C': 'foo',
|
||||||
|
'D': pd.Timestamp('20190102'),
|
||||||
|
'E': [1.0, 2.0, 3.0],
|
||||||
|
'F': False,
|
||||||
|
'G': [1, 2, 3]},
|
||||||
|
index=['0','1','2'])
|
||||||
|
|
||||||
|
expected_mappings = {'mappings': {
|
||||||
|
'properties': {'A': {'type': 'double'},
|
||||||
|
'B': {'type': 'long'},
|
||||||
|
'C': {'type': 'keyword'},
|
||||||
|
'D': {'type': 'date'},
|
||||||
|
'E': {'type': 'double'},
|
||||||
|
'F': {'type': 'boolean'},
|
||||||
|
'G': {'type': 'long'}}}}
|
||||||
|
|
||||||
|
mappings = ed.Mappings._generate_es_mappings(df)
|
||||||
|
|
||||||
|
assert expected_mappings == mappings
|
||||||
|
|
||||||
|
# Now create index
|
||||||
|
index_name = 'eland_test_generate_es_mappings'
|
||||||
|
|
||||||
|
ed.pandas_to_es(df, ELASTICSEARCH_HOST, index_name, if_exists="replace", refresh=True)
|
||||||
|
|
||||||
|
ed_df = ed.DataFrame(ELASTICSEARCH_HOST, index_name)
|
||||||
|
ed_df_head = ed_df.head()
|
||||||
|
|
||||||
|
assert_frame_equal(df, ed_df_head)
|
||||||
|
@ -7,7 +7,7 @@ import io
|
|||||||
from pandas.util.testing import (
|
from pandas.util.testing import (
|
||||||
assert_series_equal, assert_frame_equal)
|
assert_series_equal, assert_frame_equal)
|
||||||
|
|
||||||
class TestDataFrameIndexing(TestData):
|
class TestDataFrameBasics(TestData):
|
||||||
|
|
||||||
def test_mapping(self):
|
def test_mapping(self):
|
||||||
ed_flights_mappings = pd.DataFrame(self.ed_flights()._mappings._mappings_capabilities
|
ed_flights_mappings = pd.DataFrame(self.ed_flights()._mappings._mappings_capabilities
|
||||||
@ -153,3 +153,7 @@ class TestDataFrameIndexing(TestData):
|
|||||||
ed_flights_timestamp.info()
|
ed_flights_timestamp.info()
|
||||||
ed_flights.info()
|
ed_flights.info()
|
||||||
|
|
||||||
|
def test_to_pandas(self):
|
||||||
|
ed_ecommerce_pd_df = self.ed_ecommerce().to_pandas()
|
||||||
|
|
||||||
|
assert_frame_equal(self.pd_ecommerce(), ed_ecommerce_pd_df)
|
@ -15,7 +15,7 @@ class TestDataFrameGetItem(TestData):
|
|||||||
ed_carrier = self.ed_flights()['Carrier']
|
ed_carrier = self.ed_flights()['Carrier']
|
||||||
|
|
||||||
# pandas returns a Series here
|
# pandas returns a Series here
|
||||||
assert_frame_equal(pd.DataFrame(pd_carrier.head(100)), ed_carrier.head(100))
|
assert_series_equal(pd_carrier.head(100), ed_carrier.head(100))
|
||||||
|
|
||||||
pd_3_items = self.pd_flights()[['Dest','Carrier','FlightDelay']]
|
pd_3_items = self.pd_flights()[['Dest','Carrier','FlightDelay']]
|
||||||
ed_3_items = self.ed_flights()[['Dest','Carrier','FlightDelay']]
|
ed_3_items = self.ed_flights()[['Dest','Carrier','FlightDelay']]
|
||||||
@ -36,28 +36,12 @@ class TestDataFrameGetItem(TestData):
|
|||||||
def test_getattr_basic(self):
|
def test_getattr_basic(self):
|
||||||
# Test 1 attribute
|
# Test 1 attribute
|
||||||
pd_carrier = self.pd_flights().Carrier
|
pd_carrier = self.pd_flights().Carrier
|
||||||
#ed_carrier = self.ed_flights().Carrier
|
ed_carrier = self.ed_flights().Carrier
|
||||||
|
|
||||||
print(type(pd_carrier))
|
assert_series_equal(pd_carrier.head(100), ed_carrier.head(100))
|
||||||
print(pd_carrier)
|
|
||||||
|
|
||||||
def test_boolean(self):
|
pd_avgticketprice = self.pd_flights().AvgTicketPrice
|
||||||
# Test 1 attribute
|
ed_avgticketprice = self.ed_flights().AvgTicketPrice
|
||||||
pd_carrier = self.pd_flights()['Carrier == "Kibana Airlines"']
|
|
||||||
#ed_carrier = self.ed_flights().Carrier
|
|
||||||
|
|
||||||
print(type(pd_carrier))
|
assert_series_equal(pd_avgticketprice.head(100), ed_avgticketprice.head(100))
|
||||||
print(pd_carrier)
|
|
||||||
|
|
||||||
|
|
||||||
def test_loc(self):
|
|
||||||
pd = self.pd_flights().loc[10:15, ['Dest', 'Carrier']]
|
|
||||||
|
|
||||||
print(type(pd))
|
|
||||||
print(pd)
|
|
||||||
|
|
||||||
pd = self.pd_flights().loc[10]
|
|
||||||
|
|
||||||
print(type(pd))
|
|
||||||
print(pd)
|
|
||||||
|
|
||||||
|
32
eland/tests/series/test_basics_pytest.py
Normal file
32
eland/tests/series/test_basics_pytest.py
Normal file
@ -0,0 +1,32 @@
|
|||||||
|
# File called _pytest for PyCharm compatability
|
||||||
|
from eland.tests.common import TestData
|
||||||
|
|
||||||
|
import pandas as pd
|
||||||
|
import eland as ed
|
||||||
|
import io
|
||||||
|
|
||||||
|
from eland.tests import ELASTICSEARCH_HOST
|
||||||
|
from eland.tests import FLIGHTS_INDEX_NAME
|
||||||
|
|
||||||
|
from pandas.util.testing import (
|
||||||
|
assert_series_equal, assert_frame_equal)
|
||||||
|
|
||||||
|
class TestSeriesBasics(TestData):
|
||||||
|
|
||||||
|
def test_head_tail(self):
|
||||||
|
pd_s = self.pd_flights()['Carrier']
|
||||||
|
ed_s = ed.Series(ELASTICSEARCH_HOST, FLIGHTS_INDEX_NAME, 'Carrier')
|
||||||
|
|
||||||
|
pd_s_head = pd_s.head(10)
|
||||||
|
ed_s_head = ed_s.head(10)
|
||||||
|
|
||||||
|
assert_series_equal(pd_s_head, ed_s_head)
|
||||||
|
|
||||||
|
pd_s_tail = pd_s.tail(10)
|
||||||
|
ed_s_tail = ed_s.tail(10)
|
||||||
|
|
||||||
|
assert_series_equal(pd_s_tail, ed_s_tail)
|
||||||
|
|
||||||
|
def test_print(self):
|
||||||
|
ed_s = ed.Series(ELASTICSEARCH_HOST, FLIGHTS_INDEX_NAME, 'timestamp')
|
||||||
|
print(ed_s.to_string())
|
@ -1,4 +1,73 @@
|
|||||||
import eland as ed
|
from eland import Client
|
||||||
|
from eland import DataFrame
|
||||||
|
from eland import Mappings
|
||||||
|
|
||||||
def read_es(es_params, index_pattern):
|
def read_es(es_params, index_pattern):
|
||||||
return ed.DataFrame(client=es_params, index_pattern=index_pattern)
|
return DataFrame(client=es_params, index_pattern=index_pattern)
|
||||||
|
|
||||||
|
def pandas_to_es(df, es_params, destination_index, if_exists='fail', chunk_size=10000, refresh=False):
|
||||||
|
"""
|
||||||
|
Append a pandas DataFrame to an Elasticsearch index.
|
||||||
|
Mainly used in testing.
|
||||||
|
|
||||||
|
Parameters
|
||||||
|
----------
|
||||||
|
es_params : Elasticsearch client argument
|
||||||
|
elasticsearch-py parameters or
|
||||||
|
elasticsearch-py instance or
|
||||||
|
eland.Client instance
|
||||||
|
|
||||||
|
destination_index : str
|
||||||
|
Name of Elasticsearch index to be written
|
||||||
|
|
||||||
|
if_exists : str, default 'fail'
|
||||||
|
Behavior when the destination index exists. Value can be one of:
|
||||||
|
``'fail'``
|
||||||
|
If table exists, do nothing.
|
||||||
|
``'replace'``
|
||||||
|
If table exists, drop it, recreate it, and insert data.
|
||||||
|
``'append'``
|
||||||
|
If table exists, insert data. Create if does not exist.
|
||||||
|
"""
|
||||||
|
client = Client(es_params)
|
||||||
|
|
||||||
|
mapping = Mappings._generate_es_mappings(df)
|
||||||
|
|
||||||
|
# If table exists, check if_exists parameter
|
||||||
|
if client.indices().exists(destination_index):
|
||||||
|
if if_exists == "fail":
|
||||||
|
raise ValueError(
|
||||||
|
"Could not create the index [{0}] because it "
|
||||||
|
"already exists. "
|
||||||
|
"Change the if_exists parameter to "
|
||||||
|
"'append' or 'replace' data.".format(destination_index)
|
||||||
|
)
|
||||||
|
elif if_exists == "replace":
|
||||||
|
client.indices().delete(destination_index)
|
||||||
|
client.indices().create(destination_index, mapping)
|
||||||
|
#elif if_exists == "append":
|
||||||
|
# TODO validate mapping is compatible
|
||||||
|
else:
|
||||||
|
client.indices().create(destination_index, mapping)
|
||||||
|
|
||||||
|
# Now add data
|
||||||
|
actions = []
|
||||||
|
n = 0
|
||||||
|
for row in df.iterrows():
|
||||||
|
# Use index as _id
|
||||||
|
id = row[0]
|
||||||
|
values = row[1].to_dict()
|
||||||
|
|
||||||
|
# Use integer as id field for repeatable results
|
||||||
|
action = {'_index': destination_index, '_source': values, '_id': str(id)}
|
||||||
|
|
||||||
|
actions.append(action)
|
||||||
|
|
||||||
|
n = n + 1
|
||||||
|
|
||||||
|
if n % chunk_size == 0:
|
||||||
|
client.bulk(actions, refresh=refresh)
|
||||||
|
actions = []
|
||||||
|
|
||||||
|
client.bulk(actions, refresh=refresh)
|
||||||
|
|
||||||
|
Loading…
x
Reference in New Issue
Block a user