Series rename and arithmetic initial implementation

Partially implemented, tests fail with this commit.
This commit is contained in:
Stephen Dodson 2019-11-21 15:39:13 +00:00
parent 4935a7b4ee
commit c12bf9357b
16 changed files with 582 additions and 134 deletions

View File

@ -43,7 +43,7 @@ class DataFrame(NDFrame):
- elasticsearch-py instance or
- eland.Client instance
index_pattern: str
Elasticsearch index pattern (e.g. 'flights' or 'filebeat-\*')
Elasticsearch index pattern (e.g. 'flights' or 'filebeat-*')
columns: list of str, optional
List of DataFrame columns. A subset of the Elasticsearch index's fields.
index_field: str, optional

View File

@ -182,7 +182,7 @@ class Mappings:
"""
all_fields_caps_fields = all_fields_caps['fields']
columns = ['_source', 'es_dtype', 'pd_dtype', 'searchable', 'aggregatable']
field_names = ['_source', 'es_dtype', 'pd_dtype', 'searchable', 'aggregatable']
capability_matrix = {}
for field, field_caps in all_fields_caps_fields.items():
@ -208,7 +208,7 @@ class Mappings:
format(field, vv['non_searchable_indices']),
UserWarning)
capability_matrix_df = pd.DataFrame.from_dict(capability_matrix, orient='index', columns=columns)
capability_matrix_df = pd.DataFrame.from_dict(capability_matrix, orient='index', columns=field_names)
return capability_matrix_df.sort_index()
@ -325,14 +325,14 @@ class Mappings:
mappings = {}
mappings['properties'] = {}
for column_name, dtype in dataframe.dtypes.iteritems():
if geo_points is not None and column_name in geo_points:
for field_name_name, dtype in dataframe.dtypes.iteritems():
if geo_points is not None and field_name_name in geo_points:
es_dtype = 'geo_point'
else:
es_dtype = Mappings._pd_dtype_to_es_dtype(dtype)
mappings['properties'][column_name] = {}
mappings['properties'][column_name]['type'] = es_dtype
mappings['properties'][field_name_name] = {}
mappings['properties'][field_name_name]['type'] = es_dtype
return {"mappings": mappings}
@ -407,12 +407,12 @@ class Mappings:
return is_source_field
def aggregatable_columns(self, columns=None):
def aggregatable_field_names(self, field_names=None):
"""
Return a dict of aggregatable columns from all columns or columns list
Return a dict of aggregatable field_names from all field_names or field_names list
{'customer_full_name': 'customer_full_name.keyword', ...}
Logic here is that column names are '_source' fields and keyword fields
Logic here is that field_name names are '_source' fields and keyword fields
may be nested beneath the field. E.g.
customer_full_name: text
customer_full_name.keyword: keyword
@ -424,28 +424,28 @@ class Mappings:
dict
e.g. {'customer_full_name': 'customer_full_name.keyword', ...}
"""
if columns is None:
columns = self.source_fields()
if field_names is None:
field_names = self.source_fields()
aggregatables = {}
for column in columns:
capabilities = self.field_capabilities(column)
for field_name in field_names:
capabilities = self.field_capabilities(field_name)
if capabilities['aggregatable']:
aggregatables[column] = column
aggregatables[field_name] = field_name
else:
# Try 'column.keyword'
column_keyword = column + '.keyword'
capabilities = self.field_capabilities(column_keyword)
# Try 'field_name.keyword'
field_name_keyword = field_name + '.keyword'
capabilities = self.field_capabilities(field_name_keyword)
if capabilities['aggregatable']:
aggregatables[column_keyword] = column
aggregatables[field_name_keyword] = field_name
else:
# Aggregations not supported for this field
raise ValueError("Aggregations not supported for ", column)
raise ValueError("Aggregations not supported for ", field_name)
return aggregatables
def numeric_source_fields(self, columns, include_bool=True):
def numeric_source_fields(self, field_names, include_bool=True):
"""
Returns
-------
@ -461,10 +461,10 @@ class Mappings:
df = self._mappings_capabilities[(self._mappings_capabilities._source == True) &
((self._mappings_capabilities.pd_dtype == 'int64') |
(self._mappings_capabilities.pd_dtype == 'float64'))]
# if columns exists, filter index with columns
if columns is not None:
# reindex adds NA for non-existing columns (non-numeric), so drop these after reindex
df = df.reindex(columns)
# if field_names exists, filter index with field_names
if field_names is not None:
# reindex adds NA for non-existing field_names (non-numeric), so drop these after reindex
df = df.reindex(field_names)
df.dropna(inplace=True)
# return as list
@ -488,16 +488,16 @@ class Mappings:
"""
return len(self.source_fields())
def dtypes(self, columns=None):
def dtypes(self, field_names=None):
"""
Returns
-------
dtypes: pd.Series
Source field name + pd_dtype
"""
if columns is not None:
if field_names is not None:
return pd.Series(
{key: self._source_field_pd_dtypes[key] for key in columns})
{key: self._source_field_pd_dtypes[key] for key in field_names})
return pd.Series(self._source_field_pd_dtypes)

View File

@ -49,9 +49,7 @@ class NDFrame:
A reference to a Elasticsearch python client
"""
if query_compiler is None:
query_compiler = ElandQueryCompiler(client=client,
index_pattern=index_pattern,
columns=columns,
query_compiler = ElandQueryCompiler(client=client, index_pattern=index_pattern, field_names=columns,
index_field=index_field)
self._query_compiler = query_compiler

View File

@ -12,7 +12,7 @@ class Operations:
A collector of the queries and selectors we apply to queries to return the appropriate results.
For example,
- a list of the columns in the DataFrame (a subset of columns in the index)
- a list of the field_names in the DataFrame (a subset of field_names in the index)
- a size limit on the results (e.g. for head(n=5))
- a query to filter the results (e.g. df.A > 10)
@ -66,26 +66,34 @@ class Operations:
task = ('tail', (index.sort_field, n))
self._tasks.append(task)
def set_columns(self, columns):
# Setting columns at different phases of the task list may result in different
# operations. So instead of setting columns once, set when it happens in call chain
if type(columns) is not list:
columns = list(columns)
def arithmetic_op_fields(self, field_name, op_name, left_field, right_field):
task = ('arithmetic_op_fields', (field_name, (op_name, (left_field, right_field))))
# Set this as a column we want to retrieve
self.set_field_names([field_name])
# TODO - column renaming
# TODO - validate we are setting columns to a subset of last columns?
task = ('columns', columns)
self._tasks.append(task)
# Iterate backwards through task list looking for last 'columns' task
def set_field_names(self, field_names):
# Setting field_names at different phases of the task list may result in different
# operations. So instead of setting field_names once, set when it happens in call chain
if type(field_names) is not list:
field_names = list(field_names)
# TODO - field_name renaming
# TODO - validate we are setting field_names to a subset of last field_names?
task = ('field_names', field_names)
self._tasks.append(task)
# Iterate backwards through task list looking for last 'field_names' task
for task in reversed(self._tasks):
if task[0] == 'columns':
if task[0] == 'field_names':
return task[1]
return None
def get_columns(self):
# Iterate backwards through task list looking for last 'columns' task
def get_field_names(self):
# Iterate backwards through task list looking for last 'field_names' task
for task in reversed(self._tasks):
if task[0] == 'columns':
if task[0] == 'field_names':
return task[1]
return None
@ -103,8 +111,8 @@ class Operations:
"not supported {0} {1}"
.format(query_params, post_processing))
# Only return requested columns
fields = query_compiler.columns
# Only return requested field_names
fields = query_compiler.field_names
counts = {}
for field in fields:
@ -143,13 +151,13 @@ class Operations:
Parameters
----------
field_types: str, default None
if `aggregatable` use only columns whose fields in elasticseach are aggregatable.
if `aggregatable` use only field_names whose fields in elasticseach are aggregatable.
If `None`, use only numeric fields.
Returns
-------
pandas.Series
Series containing results of `func` applied to the column(s)
Series containing results of `func` applied to the field_name(s)
"""
query_params, post_processing = self._resolve_tasks()
@ -157,7 +165,7 @@ class Operations:
if size is not None:
raise NotImplementedError("Can not count field matches if size is set {}".format(size))
columns = self.get_columns()
field_names = self.get_field_names()
body = Query(query_params['query'])
@ -165,9 +173,9 @@ class Operations:
# therefore we include an optional all parameter on operations
# that call _metric_aggs
if field_types=='aggregatable':
source_fields = query_compiler._mappings.aggregatable_columns(columns)
source_fields = query_compiler._mappings.aggregatable_field_names(field_names)
else:
source_fields = query_compiler._mappings.numeric_source_fields(columns)
source_fields = query_compiler._mappings.numeric_source_fields(field_names)
for field in source_fields:
body.metric_aggs(field, func, field)
@ -209,7 +217,7 @@ class Operations:
Returns
-------
pandas.Series
Series containing results of `func` applied to the column(s)
Series containing results of `func` applied to the field_name(s)
"""
query_params, post_processing = self._resolve_tasks()
@ -217,14 +225,14 @@ class Operations:
if size is not None:
raise NotImplementedError("Can not count field matches if size is set {}".format(size))
columns = self.get_columns()
field_names = self.get_field_names()
# Get just aggregatable columns
aggregatable_columns = query_compiler._mappings.aggregatable_columns(columns)
# Get just aggregatable field_names
aggregatable_field_names = query_compiler._mappings.aggregatable_field_names(field_names)
body = Query(query_params['query'])
for field in aggregatable_columns.keys():
for field in aggregatable_field_names.keys():
body.terms_aggs(field, func, field, es_size=es_size)
response = query_compiler._client.search(
@ -234,12 +242,12 @@ class Operations:
results = {}
for key, value in aggregatable_columns.items():
for bucket in response['aggregations'][columns[0]]['buckets']:
for key, value in aggregatable_field_names.items():
for bucket in response['aggregations'][field_names[0]]['buckets']:
results[bucket['key']] = bucket['doc_count']
try:
name = columns[0]
name = field_names[0]
except IndexError:
name = None
@ -248,16 +256,16 @@ class Operations:
return s
def _hist_aggs(self, query_compiler, num_bins):
# Get histogram bins and weights for numeric columns
# Get histogram bins and weights for numeric field_names
query_params, post_processing = self._resolve_tasks()
size = self._size(query_params, post_processing)
if size is not None:
raise NotImplementedError("Can not count field matches if size is set {}".format(size))
columns = self.get_columns()
field_names = self.get_field_names()
numeric_source_fields = query_compiler._mappings.numeric_source_fields(columns)
numeric_source_fields = query_compiler._mappings.numeric_source_fields(field_names)
body = Query(query_params['query'])
@ -331,7 +339,7 @@ class Operations:
Pandas supports a lot of options here, and these options generally work on text and numerics in pandas.
Elasticsearch has metric aggs and terms aggs so will have different behaviour.
Pandas aggs that return columns (as opposed to transformed rows):
Pandas aggs that return field_names (as opposed to transformed rows):
all
any
@ -398,14 +406,14 @@ class Operations:
if size is not None:
raise NotImplementedError("Can not count field matches if size is set {}".format(size))
columns = self.get_columns()
field_names = self.get_field_names()
body = Query(query_params['query'])
# convert pandas aggs to ES equivalent
es_aggs = self._map_pd_aggs_to_es_aggs(pd_aggs)
for field in columns:
for field in field_names:
for es_agg in es_aggs:
# If we have multiple 'extended_stats' etc. here we simply NOOP on 2nd call
if isinstance(es_agg, tuple):
@ -427,7 +435,7 @@ class Operations:
"""
results = {}
for field in columns:
for field in field_names:
values = list()
for es_agg in es_aggs:
if isinstance(es_agg, tuple):
@ -448,9 +456,9 @@ class Operations:
if size is not None:
raise NotImplementedError("Can not count field matches if size is set {}".format(size))
columns = self.get_columns()
field_names = self.get_field_names()
numeric_source_fields = query_compiler._mappings.numeric_source_fields(columns, include_bool=False)
numeric_source_fields = query_compiler._mappings.numeric_source_fields(field_names, include_bool=False)
# for each field we compute:
# count, mean, std, min, 25%, 50%, 75%, max
@ -535,10 +543,15 @@ class Operations:
size, sort_params = Operations._query_params_to_size_and_sort(query_params)
body = Query(query_params['query'])
script_fields = query_params['query_script_fields']
query = Query(query_params['query'])
# Only return requested columns
columns = self.get_columns()
body = query.to_search_body()
if script_fields is not None:
body['script_fields'] = script_fields
# Only return requested field_names
field_names = self.get_field_names()
es_results = None
@ -551,14 +564,14 @@ class Operations:
index=query_compiler._index_pattern,
size=size,
sort=sort_params,
body=body.to_search_body(),
_source=columns)
body=body,
_source=field_names)
else:
is_scan = True
es_results = query_compiler._client.scan(
index=query_compiler._index_pattern,
query=body.to_search_body(),
_source=columns)
query=body,
_source=field_names)
# create post sort
if sort_params is not None:
post_processing.append(self._sort_params_to_postprocessing(sort_params))
@ -575,9 +588,9 @@ class Operations:
df = self._apply_df_post_processing(df, post_processing)
collector.collect(df)
def iloc(self, index, columns):
# index and columns are indexers
task = ('iloc', (index, columns))
def iloc(self, index, field_names):
# index and field_names are indexers
task = ('iloc', (index, field_names))
self._tasks.append(task)
def index_count(self, query_compiler, field):
@ -691,13 +704,13 @@ class Operations:
df = df.sort_values(sort_field, False)
elif action[0] == 'iloc':
index_indexer = action[1][0]
column_indexer = action[1][1]
field_name_indexer = action[1][1]
if index_indexer is None:
index_indexer = slice(None)
if column_indexer is None:
column_indexer = slice(None)
df = df.iloc[index_indexer, column_indexer]
# columns could be in here (and we ignore it)
if field_name_indexer is None:
field_name_indexer = slice(None)
df = df.iloc[index_indexer, field_name_indexer]
# field_names could be in here (and we ignore it)
return df
@ -710,6 +723,7 @@ class Operations:
"query_sort_order": None,
"query_size": None,
"query_fields": None,
"query_script_fields": None,
"query": Query()}
post_processing = []
@ -727,6 +741,8 @@ class Operations:
query_params, post_processing = self._resolve_query_terms(task, query_params, post_processing)
elif task[0] == 'boolean_filter':
query_params, post_processing = self._resolve_boolean_filter(task, query_params, post_processing)
elif task[0] == 'arithmetic_op_fields':
query_params, post_processing = self._resolve_arithmetic_op_fields(task, query_params, post_processing)
else: # a lot of operations simply post-process the dataframe - put these straight through
query_params, post_processing = self._resolve_post_processing_task(task, query_params, post_processing)
@ -858,9 +874,44 @@ class Operations:
return query_params, post_processing
def _resolve_arithmetic_op_fields(self, item, query_params, post_processing):
# task = ('arithmetic_op_fields', (field_name, (op_name, (left_field, right_field))))
field_name = item[1][0]
op_name = item[1][1][0]
left_field = item[1][1][1][0]
right_field = item[1][1][1][1]
"""
(if op_name = 'truediv')
"script_fields": {
"field_name": {
"script": {
"source": "doc[left_field].value / doc[right_field].value"
}
}
}
"""
if op_name == 'truediv':
op = '/'
else:
raise NotImplementedError("Not implemented operation '{0}'".format(op_name))
source = "doc['{0}'].value {1} doc['{2}'].value".format(left_field, op, right_field)
if query_params['query_script_fields'] is None:
query_params['query_script_fields'] = {}
query_params['query_script_fields'][field_name] = {
'script': {
'source': source
}
}
return query_params, post_processing
def _resolve_post_processing_task(self, item, query_params, post_processing):
# Just do this in post-processing
if item[0] != 'columns':
if item[0] != 'field_names':
post_processing.append(item)
return query_params, post_processing
@ -885,11 +936,11 @@ class Operations:
query_params, post_processing = self._resolve_tasks()
size, sort_params = Operations._query_params_to_size_and_sort(query_params)
columns = self.get_columns()
field_names = self.get_field_names()
buf.write(" size: {0}\n".format(size))
buf.write(" sort_params: {0}\n".format(sort_params))
buf.write(" columns: {0}\n".format(columns))
buf.write(" field_names: {0}\n".format(field_names))
buf.write(" post_processing: {0}\n".format(post_processing))
def update_query(self, boolean_filter):

View File

@ -15,10 +15,12 @@ class Query:
def __init__(self, query=None):
if query is None:
self._query = BooleanFilter()
self._script_fields = {}
self._aggs = {}
else:
# Deep copy the incoming query so we can change it
self._query = deepcopy(query._query)
self._script_fields = deepcopy(query._script_fields)
self._aggs = deepcopy(query._aggs)
def exists(self, field, must=True):
@ -157,5 +159,14 @@ class Query:
else:
self._query = self._query & boolean_filter
def arithmetic_op_fields(self, op_name, left_field, right_field):
if self._script_fields.empty():
body = None
else:
body = {"query": self._script_fields.build()}
return body
def __repr__(self):
return repr(self.to_search_body())

View File

@ -38,12 +38,8 @@ class ElandQueryCompiler:
A way to mitigate this would be to post process this drop - TODO
"""
def __init__(self,
client=None,
index_pattern=None,
columns=None,
index_field=None,
operations=None):
def __init__(self, client=None, index_pattern=None, field_names=None, index_field=None, operations=None,
name_mapper=None):
self._client = Client(client)
self._index_pattern = index_pattern
@ -58,29 +54,53 @@ class ElandQueryCompiler:
else:
self._operations = operations
if columns is not None:
self.columns = columns
if field_names is not None:
self.field_names = field_names
if name_mapper is None:
self._name_mapper = ElandQueryCompiler.DisplayNameToFieldNameMapper()
else:
self._name_mapper = name_mapper
def _get_index(self):
return self._index
def _get_field_names(self):
field_names = self._operations.get_field_names()
if field_names is None:
# default to all
field_names = self._mappings.source_fields()
return pd.Index(field_names)
def _set_field_names(self, field_names):
self._operations.set_field_names(field_names)
field_names = property(_get_field_names, _set_field_names)
def _get_columns(self):
columns = self._operations.get_columns()
columns = self._operations.get_field_names()
if columns is None:
# default to all
columns = self._mappings.source_fields()
# map renames
columns = self._name_mapper.field_to_display_names(columns)
return pd.Index(columns)
def _set_columns(self, columns):
self._operations.set_columns(columns)
# map renames
columns = self._name_mapper.display_to_field_names(columns)
self._operations.set_field_names(columns)
columns = property(_get_columns, _set_columns)
index = property(_get_index)
@property
def dtypes(self):
columns = self._operations.get_columns()
columns = self._operations.get_field_names()
return self._mappings.dtypes(columns)
@ -194,6 +214,12 @@ class ElandQueryCompiler:
row = hit['_source']
# script_fields appear in 'fields'
if 'fields' in hit:
fields = hit['fields']
for key, value in fields.items():
row[key] = value
# get index value - can be _id or can be field value in source
if self._index.is_source_field:
index_field = row[self._index.index_field]
@ -221,6 +247,10 @@ class ElandQueryCompiler:
is_source_field, pd_dtype = self._mappings.source_field_pd_dtype(missing)
df[missing] = pd.Series(dtype=pd_dtype)
# Rename columns
if not self._name_mapper.empty:
df.rename(columns=self._name_mapper.display_names_mapper(), inplace=True)
# Sort columns in mapping order
df = df[self.columns]
@ -267,6 +297,8 @@ class ElandQueryCompiler:
out[field_name].append(x)
else:
out[field_name] = x
else:
out[name[:-1]] = x
flatten(y)
@ -307,13 +339,16 @@ class ElandQueryCompiler:
return df
def copy(self):
return ElandQueryCompiler(
client=self._client,
index_pattern=self._index_pattern,
columns=None, # columns are embedded in operations
index_field=self._index.index_field,
operations=self._operations.copy()
)
return ElandQueryCompiler(client=self._client, index_pattern=self._index_pattern, field_names=None,
index_field=self._index.index_field, operations=self._operations.copy(),
name_mapper=self._name_mapper.copy())
def rename(self, renames):
result = self.copy()
result._name_mapper.rename_display_name(renames)
return result
def head(self, n):
result = self.copy()
@ -364,14 +399,7 @@ class ElandQueryCompiler:
if numeric:
raise NotImplementedError("Not implemented yet...")
result._operations.set_columns(list(key))
return result
def view(self, index=None, columns=None):
result = self.copy()
result._operations.iloc(index, columns)
result._operations.set_field_names(list(key))
return result
@ -382,7 +410,7 @@ class ElandQueryCompiler:
if columns is not None:
# columns is a pandas.Index so we can use pandas drop feature
new_columns = self.columns.drop(columns)
result._operations.set_columns(new_columns.to_list())
result._operations.set_field_names(new_columns.to_list())
if index is not None:
result._operations.drop_index_values(self, self.index.index_field, index)
@ -433,3 +461,141 @@ class ElandQueryCompiler:
return result
def check_arithmetics(self, right):
"""
Compare 2 query_compilers to see if arithmetic operations can be performed by the NDFrame object.
This does very basic comparisons and ignores some of the complexities of incompatible task lists
Raises exception if incompatible
Parameters
----------
right: ElandQueryCompiler
The query compiler to compare self to
Raises
------
TypeError, ValueError
If arithmetic operations aren't possible
"""
if not isinstance(right, ElandQueryCompiler):
raise TypeError(
"Incompatible types "
"{0} != {1}".format(type(self), type(right))
)
if self._client._es != right._client._es:
raise ValueError(
"Can not perform arithmetic operations across different clients"
"{0} != {1}".format(self._client._es, right._client._es)
)
if self._index.index_field != right._index.index_field:
raise ValueError(
"Can not perform arithmetic operations across different index fields "
"{0} != {1}".format(self._index.index_field, right._index.index_field)
)
if self._index_pattern != right._index_pattern:
raise ValueError(
"Can not perform arithmetic operations across different index patterns"
"{0} != {1}".format(self._index_pattern, right._index_pattern)
)
def arithmetic_op_fields(self, field_name, op, left_field, right_field):
result = self.copy()
result._operations.arithmetic_op_fields(field_name, op, left_field, right_field)
return result
"""
Internal class to deal with column renaming and script_fields
"""
class DisplayNameToFieldNameMapper:
def __init__(self,
field_to_display_names=None,
display_to_field_names=None):
if field_to_display_names is not None:
self._field_to_display_names = field_to_display_names
else:
self._field_to_display_names = dict()
if display_to_field_names is not None:
self._display_to_field_names = display_to_field_names
else:
self._display_to_field_names = dict()
def rename_display_name(self, renames):
for current_display_name, new_display_name in renames.items():
if current_display_name in self._display_to_field_names:
# has been renamed already - update name
field_name = self._display_to_field_names[current_display_name]
del self._display_to_field_names[current_display_name]
del self._field_to_display_names[field_name]
self._display_to_field_names[new_display_name] = field_name
self._field_to_display_names[field_name] = new_display_name
else:
# new rename - assume 'current_display_name' is 'field_name'
field_name = current_display_name
# if field_name is already mapped ignore
if field_name not in self._field_to_display_names:
self._display_to_field_names[new_display_name] = field_name
self._field_to_display_names[field_name] = new_display_name
def field_names_to_list(self):
return self._field_to_display_names.keys()
def display_names_to_list(self):
return self._display_to_field_names.keys()
# Return mapper values as dict
def display_names_mapper(self):
return self._field_to_display_names
@property
def empty(self):
return not self._display_to_field_names
def field_to_display_names(self, field_names):
if self.empty:
return field_names
display_names = []
for field_name in field_names:
if field_name in self._field_to_display_names:
display_name = self._field_to_display_names[field_name]
else:
display_name = field_name
display_names.append(display_name)
return display_names
def display_to_field_names(self, display_names):
if self.empty:
return display_names
field_names = []
for display_name in display_names:
if display_name in self._display_to_field_names:
field_name = self._display_to_field_names[display_name]
else:
field_name = display_name
field_names.append(field_name)
return field_names
def __constructor__(self, *args, **kwargs):
return type(self)(*args, **kwargs)
def copy(self):
return self.__constructor__(
field_to_display_names=self._field_to_display_names,
display_to_field_names = self._display_to_field_names
)

View File

@ -15,7 +15,7 @@ Based on NDFrame which underpins eland.1DataFrame
"""
import warnings
from io import StringIO
import pandas as pd
@ -98,6 +98,20 @@ class Series(NDFrame):
name = property(_get_name)
def rename(self, new_name):
"""
ONLY COLUMN rename supported
Parameters
----------
new_name
Returns
-------
"""
return Series(query_compiler=self._query_compiler.rename({self.name: new_name}))
def head(self, n=5):
return Series(query_compiler=self._query_compiler.head(n))
@ -141,7 +155,7 @@ class Series(NDFrame):
"""
if not isinstance(es_size, int):
raise TypeError("es_size must be a positive integer.")
if not es_size>0:
if not es_size > 0:
raise ValueError("es_size must be a positive integer.")
return self._query_compiler.value_counts(es_size)
@ -276,3 +290,35 @@ class Series(NDFrame):
"""
return 1
def info_es(self):
buf = StringIO()
super()._info_es(buf)
return buf.getvalue()
def __truediv__(self, right):
return self.truediv(right)
def truediv(self, right):
"""
return a / b
a & b == Series
a & b must share same eland.Client, index_pattern and index_field
"""
if isinstance(right, Series):
# Check compatibility
self._query_compiler.check_arithmetics(right._query_compiler)
field_name = "{0}_{1}_{2}".format(self.name, "truediv", right.name)
# Compatible, so create new Series
return Series(query_compiler=self._query_compiler.arithmetic_op_fields(
field_name, 'truediv', self.name, right.name))
else:
raise TypeError(
"Can only perform arithmetic operation on selected types "
"{0} != {1}".format(type(self), type(right))
)

View File

View File

@ -0,0 +1,28 @@
# File called _pytest for PyCharm compatability
from elasticsearch import Elasticsearch
import eland as ed
from eland.tests.common import TestData
import pytest
class TestClientEq(TestData):
def test_self_eq(self):
es = Elasticsearch('localhost')
client = ed.Client(es)
assert client != es
assert client == client
def test_non_self_ne(self):
es1 = Elasticsearch('localhost')
es2 = Elasticsearch('localhost')
client1 = ed.Client(es1)
client2 = ed.Client(es2)
assert client1 != client2

View File

@ -80,7 +80,7 @@ def assert_eland_frame_equal(left, right):
assert_frame_equal(left._to_pandas(), right._to_pandas())
def assert_pandas_eland_series_equal(left, right):
def assert_pandas_eland_series_equal(left, right, check_less_precise=False):
if not isinstance(left, pd.Series):
raise AssertionError("Expected type {exp_type}, found {act_type} instead".format(
exp_type='pd.Series', act_type=type(left)))
@ -90,4 +90,4 @@ def assert_pandas_eland_series_equal(left, right):
exp_type='ed.Series', act_type=type(right)))
# Use pandas tests to check similarity
assert_series_equal(left, right._to_pandas())
assert_series_equal(left, right._to_pandas(), check_less_precise=check_less_precise)

View File

@ -8,7 +8,7 @@ class TestMappingsAggregatables(TestData):
def test_ecommerce_all_aggregatables(self):
ed_ecommerce = self.ed_ecommerce()
aggregatables = ed_ecommerce._query_compiler._mappings.aggregatable_columns()
aggregatables = ed_ecommerce._query_compiler._mappings.aggregatable_field_names()
expected = {'category.keyword': 'category',
'currency': 'currency',
@ -67,6 +67,6 @@ class TestMappingsAggregatables(TestData):
'customer_first_name.keyword': 'customer_first_name',
'type': 'type', 'user': 'user'}
aggregatables = ed_ecommerce._query_compiler._mappings.aggregatable_columns(expected.values())
aggregatables = ed_ecommerce._query_compiler._mappings.aggregatable_field_names(expected.values())
assert expected == aggregatables

View File

@ -21,6 +21,6 @@ class TestMappingsDtypes(TestData):
pd_flights = self.pd_flights()[['Carrier', 'AvgTicketPrice', 'Cancelled']]
pd_dtypes = pd_flights.dtypes
ed_dtypes = ed_flights._query_compiler._mappings.dtypes(columns=['Carrier', 'AvgTicketPrice', 'Cancelled'])
ed_dtypes = ed_flights._query_compiler._mappings.dtypes(field_names=['Carrier', 'AvgTicketPrice', 'Cancelled'])
assert_series_equal(pd_dtypes, ed_dtypes)

View File

@ -13,13 +13,13 @@ class TestMappingsNumericSourceFields(TestData):
ed_flights = self.ed_flights()
pd_flights = self.pd_flights()
ed_numeric = ed_flights._query_compiler._mappings.numeric_source_fields(columns=None, include_bool=False)
ed_numeric = ed_flights._query_compiler._mappings.numeric_source_fields(field_names=None, include_bool=False)
pd_numeric = pd_flights.select_dtypes(include=np.number)
assert pd_numeric.columns.to_list() == ed_numeric
def test_ecommerce_selected_non_numeric_source_fields(self):
columns = ['category', 'currency', 'customer_birth_date', 'customer_first_name', 'user']
field_names = ['category', 'currency', 'customer_birth_date', 'customer_first_name', 'user']
"""
Note: non of there are numeric
category object
@ -29,16 +29,16 @@ class TestMappingsNumericSourceFields(TestData):
user object
"""
ed_ecommerce = self.ed_ecommerce()[columns]
pd_ecommerce = self.pd_ecommerce()[columns]
ed_ecommerce = self.ed_ecommerce()[field_names]
pd_ecommerce = self.pd_ecommerce()[field_names]
ed_numeric = ed_ecommerce._query_compiler._mappings.numeric_source_fields(columns=columns, include_bool=False)
ed_numeric = ed_ecommerce._query_compiler._mappings.numeric_source_fields(field_names=field_names, include_bool=False)
pd_numeric = pd_ecommerce.select_dtypes(include=np.number)
assert pd_numeric.columns.to_list() == ed_numeric
def test_ecommerce_selected_mixed_numeric_source_fields(self):
columns = ['category', 'currency', 'customer_birth_date', 'customer_first_name', 'total_quantity', 'user']
field_names = ['category', 'currency', 'customer_birth_date', 'customer_first_name', 'total_quantity', 'user']
"""
Note: one is numeric
@ -50,16 +50,16 @@ class TestMappingsNumericSourceFields(TestData):
user object
"""
ed_ecommerce = self.ed_ecommerce()[columns]
pd_ecommerce = self.pd_ecommerce()[columns]
ed_ecommerce = self.ed_ecommerce()[field_names]
pd_ecommerce = self.pd_ecommerce()[field_names]
ed_numeric = ed_ecommerce._query_compiler._mappings.numeric_source_fields(columns=columns, include_bool=False)
ed_numeric = ed_ecommerce._query_compiler._mappings.numeric_source_fields(field_names=field_names, include_bool=False)
pd_numeric = pd_ecommerce.select_dtypes(include=np.number)
assert pd_numeric.columns.to_list() == ed_numeric
def test_ecommerce_selected_all_numeric_source_fields(self):
columns = ['total_quantity', 'taxful_total_price', 'taxless_total_price']
field_names = ['total_quantity', 'taxful_total_price', 'taxless_total_price']
"""
Note: all are numeric
@ -68,10 +68,10 @@ class TestMappingsNumericSourceFields(TestData):
taxless_total_price float64
"""
ed_ecommerce = self.ed_ecommerce()[columns]
pd_ecommerce = self.pd_ecommerce()[columns]
ed_ecommerce = self.ed_ecommerce()[field_names]
pd_ecommerce = self.pd_ecommerce()[field_names]
ed_numeric = ed_ecommerce._query_compiler._mappings.numeric_source_fields(columns=columns, include_bool=False)
ed_numeric = ed_ecommerce._query_compiler._mappings.numeric_source_fields(field_names=field_names, include_bool=False)
pd_numeric = pd_ecommerce.select_dtypes(include=np.number)
assert pd_numeric.columns.to_list() == ed_numeric

View File

@ -0,0 +1,75 @@
# File called _pytest for PyCharm compatability
import pandas as pd
from pandas.util.testing import assert_series_equal
from eland import ElandQueryCompiler
from eland.tests.common import TestData
class TestQueryCompilerRename(TestData):
def test_query_compiler_basic_rename(self):
field_names = []
display_names = []
mapper = ElandQueryCompiler.DisplayNameToFieldNameMapper()
assert field_names == mapper.field_names_to_list()
assert display_names == mapper.display_names_to_list()
field_names = ['a']
display_names = ['A']
update_A = {'a' : 'A'}
mapper.rename_display_name(update_A)
assert field_names == mapper.field_names_to_list()
assert display_names == mapper.display_names_to_list()
field_names = ['a', 'b']
display_names = ['A', 'B']
update_B = {'b' : 'B'}
mapper.rename_display_name(update_B)
assert field_names == mapper.field_names_to_list()
assert display_names == mapper.display_names_to_list()
field_names = ['a', 'b']
display_names = ['AA', 'B']
update_AA = {'A' : 'AA'}
mapper.rename_display_name(update_AA)
assert field_names == mapper.field_names_to_list()
assert display_names == mapper.display_names_to_list()
def test_query_compiler_basic_rename_columns(self):
columns = ['a', 'b', 'c', 'd']
mapper = ElandQueryCompiler.DisplayNameToFieldNameMapper()
display_names = ['A', 'b', 'c', 'd']
update_A = {'a' : 'A'}
mapper.rename_display_name(update_A)
assert display_names == mapper.display_names(columns)
# Invalid update
display_names = ['A', 'b', 'c', 'd']
update_ZZ = {'a' : 'ZZ'}
mapper.rename_display_name(update_ZZ)
assert display_names == mapper.display_names(columns)
display_names = ['AA', 'b', 'c', 'd']
update_AA = {'A' : 'AA'} # already renamed to 'A'
mapper.rename_display_name(update_AA)
assert display_names == mapper.display_names(columns)
display_names = ['AA', 'b', 'C', 'd']
update_AA_C = {'a' : 'AA', 'c' : 'C'} # 'a' rename ignored
mapper.rename_display_name(update_AA_C)
assert display_names == mapper.display_names(columns)

View File

@ -0,0 +1,50 @@
# File called _pytest for PyCharm compatability
import eland as ed
from eland.tests.common import TestData, assert_pandas_eland_series_equal
from pandas.util.testing import assert_series_equal
import pytest
class TestSeriesArithmetics(TestData):
def test_ecommerce_series_invalid_div(self):
pd_df = self.pd_ecommerce()
ed_df = self.ed_ecommerce()
# eland / pandas == error
with pytest.raises(TypeError):
ed_df['total_quantity'] / pd_df['taxful_total_price']
def test_ecommerce_series_div(self):
pd_df = self.pd_ecommerce()
ed_df = self.ed_ecommerce()
pd_avg_price = pd_df['total_quantity'] / pd_df['taxful_total_price']
print(pd_avg_price) # this has None as name
ed_avg_price = ed_df['total_quantity'] / ed_df['taxful_total_price']
print(ed_avg_price)
assert_pandas_eland_series_equal(pd_avg_price, ed_avg_price, check_less_precise=True)
def test_ecommerce_series_div_float(self):
pd_df = self.pd_ecommerce()
ed_df = self.ed_ecommerce()
pd_avg_price = pd_df['total_quantity'] / 10.0
print(pd_avg_price)
ed_avg_price = ed_df['total_quantity'] / 10.0
print(ed_avg_price)
def test_ecommerce_series_div_other(self):
ed_df = self.ed_ecommerce()
ed_s1 = ed_df.total_quantity
ed_s2 = ed_df.taxful_total_price
print(ed_s1)
print(ed_s2)
print(ed_s1)
print(ed_s2)

View File

@ -0,0 +1,23 @@
# File called _pytest for PyCharm compatability
import eland as ed
from eland.tests import ELASTICSEARCH_HOST
from eland.tests import FLIGHTS_INDEX_NAME
from eland.tests.common import TestData
from eland.tests.common import assert_pandas_eland_series_equal
class TestSeriesRename(TestData):
def test_rename(self):
pd_carrier = self.pd_flights()['Carrier']
ed_carrier = ed.Series(ELASTICSEARCH_HOST, FLIGHTS_INDEX_NAME, 'Carrier')
assert_pandas_eland_series_equal(pd_carrier, ed_carrier)
pd_renamed = pd_carrier.rename("renamed")
ed_renamed = ed_carrier.rename("renamed")
assert_pandas_eland_series_equal(pd_renamed, ed_renamed)