mirror of
https://github.com/elastic/eland.git
synced 2025-07-11 00:02:14 +08:00
Adding __r* operations and resolving issues with df.info()
This commit is contained in:
parent
ac8cb302de
commit
b99f25e4ee
@ -519,7 +519,12 @@ class DataFrame(NDFrame):
|
||||
else:
|
||||
_verbose_repr()
|
||||
|
||||
# pandas 0.25.1 uses get_dtype_counts() here. This
|
||||
# returns a Series with strings as the index NOT dtypes.
|
||||
# Therefore, to get consistent ordering we need to
|
||||
# align types with pandas method.
|
||||
counts = self.dtypes.value_counts()
|
||||
counts.index = counts.index.astype(str)
|
||||
dtypes = ['{k}({kk:d})'.format(k=k[0], kk=k[1]) for k
|
||||
in sorted(counts.items())]
|
||||
lines.append('dtypes: {types}'.format(types=', '.join(dtypes)))
|
||||
|
@ -1,6 +1,7 @@
|
||||
import copy
|
||||
from enum import Enum
|
||||
|
||||
import numpy as np
|
||||
import pandas as pd
|
||||
|
||||
from eland import Index
|
||||
@ -172,7 +173,7 @@ class Operations:
|
||||
# some metrics aggs (including cardinality) work on all aggregatable fields
|
||||
# therefore we include an optional all parameter on operations
|
||||
# that call _metric_aggs
|
||||
if field_types=='aggregatable':
|
||||
if field_types == 'aggregatable':
|
||||
source_fields = query_compiler._mappings.aggregatable_field_names(field_names)
|
||||
else:
|
||||
source_fields = query_compiler._mappings.numeric_source_fields(field_names)
|
||||
@ -193,7 +194,7 @@ class Operations:
|
||||
# }
|
||||
results = {}
|
||||
|
||||
if field_types=='aggregatable':
|
||||
if field_types == 'aggregatable':
|
||||
for key, value in source_fields.items():
|
||||
results[value] = response['aggregations'][key]['value']
|
||||
else:
|
||||
@ -538,7 +539,6 @@ class Operations:
|
||||
|
||||
return collector.ret
|
||||
|
||||
|
||||
def _es_results(self, query_compiler, collector):
|
||||
query_params, post_processing = self._resolve_tasks()
|
||||
|
||||
@ -561,12 +561,24 @@ class Operations:
|
||||
is_scan = False
|
||||
if size is not None and size <= 10000:
|
||||
if size > 0:
|
||||
es_results = query_compiler._client.search(
|
||||
index=query_compiler._index_pattern,
|
||||
size=size,
|
||||
sort=sort_params,
|
||||
body=body,
|
||||
_source=field_names)
|
||||
try:
|
||||
es_results = query_compiler._client.search(
|
||||
index=query_compiler._index_pattern,
|
||||
size=size,
|
||||
sort=sort_params,
|
||||
body=body,
|
||||
_source=field_names)
|
||||
except:
|
||||
# Catch ES error and print debug (currently to stdout)
|
||||
error = {
|
||||
'index': query_compiler._index_pattern,
|
||||
'size': size,
|
||||
'sort': sort_params,
|
||||
'body': body,
|
||||
'_source': field_names
|
||||
}
|
||||
print("Elasticsearch error:", error)
|
||||
raise
|
||||
else:
|
||||
is_scan = True
|
||||
es_results = query_compiler._client.scan(
|
||||
@ -589,7 +601,6 @@ class Operations:
|
||||
df = self._apply_df_post_processing(df, post_processing)
|
||||
collector.collect(df)
|
||||
|
||||
|
||||
def iloc(self, index, field_names):
|
||||
# index and field_names are indexers
|
||||
task = ('iloc', (index, field_names))
|
||||
@ -884,7 +895,7 @@ class Operations:
|
||||
right_field = item[1][1][1][1]
|
||||
|
||||
# https://www.elastic.co/guide/en/elasticsearch/painless/current/painless-api-reference-shared-java-lang.html#painless-api-reference-shared-Math
|
||||
if isinstance(right_field, str):
|
||||
if isinstance(left_field, str) and isinstance(right_field, str):
|
||||
"""
|
||||
(if op_name = '__truediv__')
|
||||
|
||||
@ -913,7 +924,6 @@ class Operations:
|
||||
else:
|
||||
raise NotImplementedError("Not implemented operation '{0}'".format(op_name))
|
||||
|
||||
|
||||
if query_params['query_script_fields'] is None:
|
||||
query_params['query_script_fields'] = {}
|
||||
query_params['query_script_fields'][field_name] = {
|
||||
@ -921,7 +931,7 @@ class Operations:
|
||||
'source': source
|
||||
}
|
||||
}
|
||||
else:
|
||||
elif isinstance(left_field, str) and np.issubdtype(np.dtype(type(right_field)), np.number):
|
||||
"""
|
||||
(if op_name = '__truediv__')
|
||||
|
||||
@ -949,18 +959,48 @@ class Operations:
|
||||
source = "doc['{0}'].value - {1}".format(left_field, right_field)
|
||||
else:
|
||||
raise NotImplementedError("Not implemented operation '{0}'".format(op_name))
|
||||
elif np.issubdtype(np.dtype(type(left_field)), np.number) and isinstance(right_field, str):
|
||||
"""
|
||||
(if op_name = '__truediv__')
|
||||
|
||||
|
||||
if query_params['query_script_fields'] is None:
|
||||
query_params['query_script_fields'] = {}
|
||||
query_params['query_script_fields'][field_name] = {
|
||||
'script': {
|
||||
'source': source
|
||||
"script_fields": {
|
||||
"field_name": {
|
||||
"script": {
|
||||
"source": "left_field / doc['right_field'].value"
|
||||
}
|
||||
}
|
||||
}
|
||||
"""
|
||||
if op_name == '__add__':
|
||||
source = "{0} + doc['{1}'].value".format(left_field, right_field)
|
||||
elif op_name == '__truediv__':
|
||||
source = "{0} / doc['{1}'].value".format(left_field, right_field)
|
||||
elif op_name == '__floordiv__':
|
||||
source = "Math.floor({0} / doc['{1}'].value)".format(left_field, right_field)
|
||||
elif op_name == '__pow__':
|
||||
source = "Math.pow({0}, doc['{1}'].value)".format(left_field, right_field)
|
||||
elif op_name == '__mod__':
|
||||
source = "{0} % doc['{1}'].value".format(left_field, right_field)
|
||||
elif op_name == '__mul__':
|
||||
source = "{0} * doc['{1}'].value".format(left_field, right_field)
|
||||
elif op_name == '__sub__':
|
||||
source = "{0} - doc['{1}'].value".format(left_field, right_field)
|
||||
else:
|
||||
raise NotImplementedError("Not implemented operation '{0}'".format(op_name))
|
||||
else:
|
||||
raise TypeError("Types for operation inconsistent {} {} {}", type(left_field), type(right_field), op_name)
|
||||
|
||||
if query_params['query_script_fields'] is None:
|
||||
query_params['query_script_fields'] = {}
|
||||
query_params['query_script_fields'][field_name] = {
|
||||
'script': {
|
||||
'source': source
|
||||
}
|
||||
}
|
||||
|
||||
return query_params, post_processing
|
||||
|
||||
|
||||
def _resolve_post_processing_task(self, item, query_params, post_processing):
|
||||
# Just do this in post-processing
|
||||
if item[0] != 'field_names':
|
||||
@ -968,6 +1008,7 @@ class Operations:
|
||||
|
||||
return query_params, post_processing
|
||||
|
||||
|
||||
def _size(self, query_params, post_processing):
|
||||
# Shrink wrap code around checking if size parameter is set
|
||||
size = query_params['query_size'] # can be None
|
||||
@ -982,6 +1023,7 @@ class Operations:
|
||||
# This can return None
|
||||
return size
|
||||
|
||||
|
||||
def info_es(self, buf):
|
||||
buf.write("Operations:\n")
|
||||
buf.write(" tasks: {0}\n".format(self._tasks))
|
||||
@ -1002,6 +1044,7 @@ class Operations:
|
||||
buf.write(" body: {0}\n".format(body))
|
||||
buf.write(" post_processing: {0}\n".format(post_processing))
|
||||
|
||||
|
||||
def update_query(self, boolean_filter):
|
||||
task = ('boolean_filter', boolean_filter)
|
||||
self._tasks.append(task)
|
||||
|
@ -499,6 +499,7 @@ class Series(NDFrame):
|
||||
"""
|
||||
return self._numeric_op(right, _get_method_name())
|
||||
|
||||
|
||||
def __truediv__(self, right):
|
||||
"""
|
||||
Return floating division of series and right, element-wise (binary operator truediv).
|
||||
@ -528,7 +529,7 @@ class Series(NDFrame):
|
||||
3 2
|
||||
4 2
|
||||
Name: total_quantity, dtype: int64
|
||||
>>> df.taxful_total_price / df.total_quantity
|
||||
>>> df.taxful_total_price / df.total_quantity # doctest: +SKIP
|
||||
0 18.490000
|
||||
1 26.990000
|
||||
2 99.989998
|
||||
@ -733,6 +734,21 @@ class Series(NDFrame):
|
||||
"""
|
||||
return self._numeric_op(right, _get_method_name())
|
||||
|
||||
def __radd__(self, left):
|
||||
return self._numeric_rop(left, _get_method_name())
|
||||
def __rtruediv__(self, left):
|
||||
return self._numeric_rop(left, _get_method_name())
|
||||
def __rfloordiv__(self, left):
|
||||
return self._numeric_rop(left, _get_method_name())
|
||||
def __rmod__(self, left):
|
||||
return self._numeric_rop(left, _get_method_name())
|
||||
def __rmul__(self, left):
|
||||
return self._numeric_rop(left, _get_method_name())
|
||||
def __rpow__(self, left):
|
||||
return self._numeric_rop(left, _get_method_name())
|
||||
def __rsub__(self, left):
|
||||
return self._numeric_rop(left, _get_method_name())
|
||||
|
||||
add = __add__
|
||||
div = __truediv__
|
||||
divide = __truediv__
|
||||
@ -745,6 +761,18 @@ class Series(NDFrame):
|
||||
subtract = __sub__
|
||||
truediv = __truediv__
|
||||
|
||||
radd = __radd__
|
||||
rdiv = __rtruediv__
|
||||
rdivide = __rtruediv__
|
||||
rfloordiv = __rfloordiv__
|
||||
rmod = __rmod__
|
||||
rmul = __rmul__
|
||||
rmultiply = __rmul__
|
||||
rpow = __rpow__
|
||||
rsub = __rsub__
|
||||
rsubtract = __rsub__
|
||||
rtruediv = __rtruediv__
|
||||
|
||||
def _numeric_op(self, right, method_name):
|
||||
"""
|
||||
return a op b
|
||||
@ -787,6 +815,31 @@ class Series(NDFrame):
|
||||
"unsupported operand type(s) for '{}' {} '{}'".format(type(self), method_name, type(right))
|
||||
)
|
||||
|
||||
def _numeric_rop(self, left, method_name):
|
||||
"""
|
||||
e.g. 1 + ed.Series
|
||||
"""
|
||||
op_method_name = str(method_name).replace('__r', '__')
|
||||
if isinstance(left, Series):
|
||||
# if both are Series, revese args and call normal op method and remove 'r' from radd etc.
|
||||
return left._numeric_op(self, op_method_name)
|
||||
elif np.issubdtype(np.dtype(type(left)), np.number): # allow np types
|
||||
# Prefix new field name with 'f_' so it's a valid ES field name
|
||||
new_field_name = "f_{0}_{1}_{2}".format(str(left).replace('.', '_'), op_method_name, self.name)
|
||||
|
||||
# Compatible, so create new Series
|
||||
series = Series(query_compiler=self._query_compiler.arithmetic_op_fields(
|
||||
new_field_name, op_method_name, left, self.name))
|
||||
|
||||
# name of Series pinned to valid series (like pandas)
|
||||
series.name = self.name
|
||||
|
||||
return series
|
||||
else:
|
||||
raise TypeError(
|
||||
"unsupported operand type(s) for '{}' {} '{}'".format(type(self), method_name, type(left))
|
||||
)
|
||||
|
||||
def max(self):
|
||||
"""
|
||||
Return the maximum of the Series values
|
||||
|
@ -104,3 +104,39 @@ class TestSeriesArithmetics(TestData):
|
||||
pd_series = getattr(pd_df['total_quantity'], op)(pd_df['currency'])
|
||||
with pytest.raises(TypeError):
|
||||
ed_series = getattr(ed_df['total_quantity'], op)(ed_df['currency'])
|
||||
|
||||
def test_ecommerce_series_basic_rarithmetics(self):
|
||||
pd_df = self.pd_ecommerce().head(10)
|
||||
ed_df = self.ed_ecommerce().head(10)
|
||||
|
||||
ops = ['__radd__',
|
||||
'__rtruediv__',
|
||||
'__rfloordiv__',
|
||||
'__rpow__',
|
||||
'__rmod__',
|
||||
'__rmul__',
|
||||
'__rsub__',
|
||||
'radd',
|
||||
'rtruediv',
|
||||
'rfloordiv',
|
||||
'rpow',
|
||||
'rmod',
|
||||
'rmul',
|
||||
'rsub']
|
||||
|
||||
for op in ops:
|
||||
pd_series = getattr(pd_df['taxful_total_price'], op)(pd_df['total_quantity'])
|
||||
ed_series = getattr(ed_df['taxful_total_price'], op)(ed_df['total_quantity'])
|
||||
assert_pandas_eland_series_equal(pd_series, ed_series, check_less_precise=True)
|
||||
|
||||
pd_series = getattr(pd_df['taxful_total_price'], op)(3.141)
|
||||
ed_series = getattr(ed_df['taxful_total_price'], op)(3.141)
|
||||
assert_pandas_eland_series_equal(pd_series, ed_series, check_less_precise=True)
|
||||
|
||||
pd_series = getattr(pd_df['taxful_total_price'], op)(np.float32(2.879))
|
||||
ed_series = getattr(ed_df['taxful_total_price'], op)(np.float32(2.879))
|
||||
assert_pandas_eland_series_equal(pd_series, ed_series, check_less_precise=True)
|
||||
|
||||
pd_series = getattr(pd_df['taxful_total_price'], op)(int(6))
|
||||
ed_series = getattr(ed_df['taxful_total_price'], op)(int(6))
|
||||
assert_pandas_eland_series_equal(pd_series, ed_series, check_less_precise=True)
|
||||
|
Loading…
x
Reference in New Issue
Block a user