mirror of
https://github.com/elastic/eland.git
synced 2025-07-11 00:02:14 +08:00
Updates based on PR review.
This commit is contained in:
parent
e755a2e160
commit
ac8cb302de
@ -383,7 +383,8 @@ class DataFrame(NDFrame):
|
|||||||
tasks: [('boolean_filter', {'bool': {'must': [{'term': {'OriginAirportID': 'AMS'}}, {'range': {'FlightDelayMin': {'gt': 60}}}]}}), ('field_names', ['timestamp', 'OriginAirportID', 'DestAirportID', 'FlightDelayMin']), ('tail', ('_doc', 5))]
|
tasks: [('boolean_filter', {'bool': {'must': [{'term': {'OriginAirportID': 'AMS'}}, {'range': {'FlightDelayMin': {'gt': 60}}}]}}), ('field_names', ['timestamp', 'OriginAirportID', 'DestAirportID', 'FlightDelayMin']), ('tail', ('_doc', 5))]
|
||||||
size: 5
|
size: 5
|
||||||
sort_params: _doc:desc
|
sort_params: _doc:desc
|
||||||
field_names: ['timestamp', 'OriginAirportID', 'DestAirportID', 'FlightDelayMin']
|
_source: ['timestamp', 'OriginAirportID', 'DestAirportID', 'FlightDelayMin']
|
||||||
|
body: {'query': {'bool': {'must': [{'term': {'OriginAirportID': 'AMS'}}, {'range': {'FlightDelayMin': {'gt': 60}}}]}}, 'aggs': {}}
|
||||||
post_processing: ['sort_index']
|
post_processing: ['sort_index']
|
||||||
<BLANKLINE>
|
<BLANKLINE>
|
||||||
"""
|
"""
|
||||||
|
@ -1,5 +1,6 @@
|
|||||||
import warnings
|
import warnings
|
||||||
|
|
||||||
|
import numpy as np
|
||||||
import pandas as pd
|
import pandas as pd
|
||||||
from pandas.core.dtypes.common import (is_float_dtype, is_bool_dtype, is_integer_dtype, is_datetime_or_timedelta_dtype,
|
from pandas.core.dtypes.common import (is_float_dtype, is_bool_dtype, is_integer_dtype, is_datetime_or_timedelta_dtype,
|
||||||
is_string_dtype)
|
is_string_dtype)
|
||||||
@ -493,13 +494,14 @@ class Mappings:
|
|||||||
Returns
|
Returns
|
||||||
-------
|
-------
|
||||||
dtypes: pd.Series
|
dtypes: pd.Series
|
||||||
Source field name + pd_dtype
|
Source field name + pd_dtype as np.dtype
|
||||||
"""
|
"""
|
||||||
if field_names is not None:
|
if field_names is not None:
|
||||||
return pd.Series(
|
return pd.Series(
|
||||||
{key: self._source_field_pd_dtypes[key] for key in field_names})
|
{key: np.dtype(self._source_field_pd_dtypes[key]) for key in field_names})
|
||||||
|
|
||||||
return pd.Series(self._source_field_pd_dtypes)
|
return pd.Series(
|
||||||
|
{key: np.dtype(value) for key, value in self._source_field_pd_dtypes.items()})
|
||||||
|
|
||||||
def info_es(self, buf):
|
def info_es(self, buf):
|
||||||
buf.write("Mappings:\n")
|
buf.write("Mappings:\n")
|
||||||
|
@ -77,7 +77,7 @@ class Operations:
|
|||||||
def set_field_names(self, field_names):
|
def set_field_names(self, field_names):
|
||||||
# Setting field_names at different phases of the task list may result in different
|
# Setting field_names at different phases of the task list may result in different
|
||||||
# operations. So instead of setting field_names once, set when it happens in call chain
|
# operations. So instead of setting field_names once, set when it happens in call chain
|
||||||
if type(field_names) is not list:
|
if not isinstance(field_names, list):
|
||||||
field_names = list(field_names)
|
field_names = list(field_names)
|
||||||
|
|
||||||
# TODO - field_name renaming
|
# TODO - field_name renaming
|
||||||
@ -538,6 +538,7 @@ class Operations:
|
|||||||
|
|
||||||
return collector.ret
|
return collector.ret
|
||||||
|
|
||||||
|
|
||||||
def _es_results(self, query_compiler, collector):
|
def _es_results(self, query_compiler, collector):
|
||||||
query_params, post_processing = self._resolve_tasks()
|
query_params, post_processing = self._resolve_tasks()
|
||||||
|
|
||||||
@ -989,9 +990,16 @@ class Operations:
|
|||||||
size, sort_params = Operations._query_params_to_size_and_sort(query_params)
|
size, sort_params = Operations._query_params_to_size_and_sort(query_params)
|
||||||
field_names = self.get_field_names()
|
field_names = self.get_field_names()
|
||||||
|
|
||||||
|
script_fields = query_params['query_script_fields']
|
||||||
|
query = Query(query_params['query'])
|
||||||
|
body = query.to_search_body()
|
||||||
|
if script_fields is not None:
|
||||||
|
body['script_fields'] = script_fields
|
||||||
|
|
||||||
buf.write(" size: {0}\n".format(size))
|
buf.write(" size: {0}\n".format(size))
|
||||||
buf.write(" sort_params: {0}\n".format(sort_params))
|
buf.write(" sort_params: {0}\n".format(sort_params))
|
||||||
buf.write(" field_names: {0}\n".format(field_names))
|
buf.write(" _source: {0}\n".format(field_names))
|
||||||
|
buf.write(" body: {0}\n".format(body))
|
||||||
buf.write(" post_processing: {0}\n".format(post_processing))
|
buf.write(" post_processing: {0}\n".format(post_processing))
|
||||||
|
|
||||||
def update_query(self, boolean_filter):
|
def update_query(self, boolean_filter):
|
||||||
|
@ -1,7 +1,5 @@
|
|||||||
import pandas as pd
|
import pandas as pd
|
||||||
from pandas.core.dtypes.common import (
|
import numpy as np
|
||||||
is_list_like
|
|
||||||
)
|
|
||||||
|
|
||||||
from eland import Client
|
from eland import Client
|
||||||
from eland import Index
|
from eland import Index
|
||||||
@ -300,6 +298,15 @@ class ElandQueryCompiler:
|
|||||||
else:
|
else:
|
||||||
out[field_name] = x
|
out[field_name] = x
|
||||||
else:
|
else:
|
||||||
|
# Script fields end up here
|
||||||
|
|
||||||
|
# Elasticsearch returns 'Infinity' as a string for np.inf values.
|
||||||
|
# Map this to a numeric value to avoid this whole Series being classed as an object
|
||||||
|
# TODO - create a lookup for script fields and dtypes to only map 'Infinity'
|
||||||
|
# if the field is numeric. This implementation will currently map
|
||||||
|
# any script field with "Infinity" as a string to np.inf
|
||||||
|
if x == 'Infinity':
|
||||||
|
x = np.inf
|
||||||
out[name[:-1]] = x
|
out[name[:-1]] = x
|
||||||
|
|
||||||
flatten(y)
|
flatten(y)
|
||||||
@ -602,4 +609,3 @@ class ElandQueryCompiler:
|
|||||||
field_to_display_names=self._field_to_display_names.copy(),
|
field_to_display_names=self._field_to_display_names.copy(),
|
||||||
display_to_field_names=self._display_to_field_names.copy()
|
display_to_field_names=self._display_to_field_names.copy()
|
||||||
)
|
)
|
||||||
|
|
||||||
|
@ -19,6 +19,8 @@ import sys
|
|||||||
import warnings
|
import warnings
|
||||||
from io import StringIO
|
from io import StringIO
|
||||||
|
|
||||||
|
import numpy as np
|
||||||
|
|
||||||
import pandas as pd
|
import pandas as pd
|
||||||
from pandas.io.common import _expand_user, _stringify_path
|
from pandas.io.common import _expand_user, _stringify_path
|
||||||
|
|
||||||
@ -140,7 +142,9 @@ class Series(NDFrame):
|
|||||||
def rename(self, new_name):
|
def rename(self, new_name):
|
||||||
"""
|
"""
|
||||||
Rename name of series. Only column rename is supported. This does not change the underlying
|
Rename name of series. Only column rename is supported. This does not change the underlying
|
||||||
Elasticsearch index, but adds a soft link from the new name (column) to the Elasticsearch field name
|
Elasticsearch index, but adds a symbolic link from the new name (column) to the Elasticsearch field name.
|
||||||
|
|
||||||
|
For instance, if a field was called 'tot_quan' it could be renamed 'Total Quantity'.
|
||||||
|
|
||||||
Parameters
|
Parameters
|
||||||
----------
|
----------
|
||||||
@ -358,6 +362,11 @@ class Series(NDFrame):
|
|||||||
def _to_pandas(self):
|
def _to_pandas(self):
|
||||||
return self._query_compiler.to_pandas()[self.name]
|
return self._query_compiler.to_pandas()[self.name]
|
||||||
|
|
||||||
|
@property
|
||||||
|
def _dtype(self):
|
||||||
|
# DO NOT MAKE PUBLIC (i.e. def dtype) as this breaks query eval implementation
|
||||||
|
return self._query_compiler.dtypes[0]
|
||||||
|
|
||||||
def __gt__(self, other):
|
def __gt__(self, other):
|
||||||
if isinstance(other, Series):
|
if isinstance(other, Series):
|
||||||
# Need to use scripted query to compare to values
|
# Need to use scripted query to compare to values
|
||||||
@ -745,9 +754,15 @@ class Series(NDFrame):
|
|||||||
a == Series, b == numeric
|
a == Series, b == numeric
|
||||||
"""
|
"""
|
||||||
if isinstance(right, Series):
|
if isinstance(right, Series):
|
||||||
# Check compatibility
|
# Check compatibility of Elasticsearch cluster
|
||||||
self._query_compiler.check_arithmetics(right._query_compiler)
|
self._query_compiler.check_arithmetics(right._query_compiler)
|
||||||
|
|
||||||
|
# Check compatibility of dtypes
|
||||||
|
# either not a number?
|
||||||
|
if not (np.issubdtype(self._dtype, np.number) and np.issubdtype(right._dtype, np.number)):
|
||||||
|
# TODO - support limited ops on strings https://github.com/elastic/eland/issues/65
|
||||||
|
raise TypeError("Unsupported operation: '{}' {} '{}'".format(self._dtype, method_name, right._dtype))
|
||||||
|
|
||||||
new_field_name = "{0}_{1}_{2}".format(self.name, method_name, right.name)
|
new_field_name = "{0}_{1}_{2}".format(self.name, method_name, right.name)
|
||||||
|
|
||||||
# Compatible, so create new Series
|
# Compatible, so create new Series
|
||||||
@ -756,12 +771,12 @@ class Series(NDFrame):
|
|||||||
series.name = None
|
series.name = None
|
||||||
|
|
||||||
return series
|
return series
|
||||||
elif isinstance(right, (int, float)): # TODO extend to numpy types
|
elif np.issubdtype(np.dtype(type(right)), np.number): # allow np types
|
||||||
new_field_name = "{0}_{1}_{2}".format(self.name, method_name, str(right).replace('.', '_'))
|
new_field_name = "{0}_{1}_{2}".format(self.name, method_name, str(right).replace('.', '_'))
|
||||||
|
|
||||||
# Compatible, so create new Series
|
# Compatible, so create new Series
|
||||||
series = Series(query_compiler=self._query_compiler.arithmetic_op_fields(
|
series = Series(query_compiler=self._query_compiler.arithmetic_op_fields(
|
||||||
new_field_name, method_name, self.name, float(right))) # force rhs to float
|
new_field_name, method_name, self.name, right))
|
||||||
|
|
||||||
# name of Series remains original name
|
# name of Series remains original name
|
||||||
series.name = self.name
|
series.name = self.name
|
||||||
@ -769,8 +784,7 @@ class Series(NDFrame):
|
|||||||
return series
|
return series
|
||||||
else:
|
else:
|
||||||
raise TypeError(
|
raise TypeError(
|
||||||
"Can only perform arithmetic operation on selected types "
|
"unsupported operand type(s) for '{}' {} '{}'".format(type(self), method_name, type(right))
|
||||||
"{0} != {1} for {2}".format(type(self), type(right), method_name)
|
|
||||||
)
|
)
|
||||||
|
|
||||||
def max(self):
|
def max(self):
|
||||||
|
@ -16,6 +16,9 @@ class TestDataFrameDtypes(TestData):
|
|||||||
|
|
||||||
assert_series_equal(pd_flights.dtypes, ed_flights.dtypes)
|
assert_series_equal(pd_flights.dtypes, ed_flights.dtypes)
|
||||||
|
|
||||||
|
for i in range(0, len(pd_flights.dtypes)-1):
|
||||||
|
assert type(pd_flights.dtypes[i]) == type(ed_flights.dtypes[i])
|
||||||
|
|
||||||
def test_flights_select_dtypes(self):
|
def test_flights_select_dtypes(self):
|
||||||
ed_flights = self.ed_flights_small()
|
ed_flights = self.ed_flights_small()
|
||||||
pd_flights = self.pd_flights_small()
|
pd_flights = self.pd_flights_small()
|
||||||
|
@ -1,11 +1,10 @@
|
|||||||
# File called _pytest for PyCharm compatability
|
# File called _pytest for PyCharm compatability
|
||||||
import eland as ed
|
|
||||||
from eland.tests.common import TestData, assert_pandas_eland_series_equal
|
|
||||||
from pandas.util.testing import assert_series_equal
|
|
||||||
import pytest
|
import pytest
|
||||||
|
|
||||||
import numpy as np
|
import numpy as np
|
||||||
|
|
||||||
|
from eland.tests.common import TestData, assert_pandas_eland_series_equal
|
||||||
|
|
||||||
|
|
||||||
class TestSeriesArithmetics(TestData):
|
class TestSeriesArithmetics(TestData):
|
||||||
|
|
||||||
@ -45,7 +44,63 @@ class TestSeriesArithmetics(TestData):
|
|||||||
ed_series = getattr(ed_df['taxful_total_price'], op)(10.56)
|
ed_series = getattr(ed_df['taxful_total_price'], op)(10.56)
|
||||||
assert_pandas_eland_series_equal(pd_series, ed_series, check_less_precise=True)
|
assert_pandas_eland_series_equal(pd_series, ed_series, check_less_precise=True)
|
||||||
|
|
||||||
|
pd_series = getattr(pd_df['taxful_total_price'], op)(np.float32(1.879))
|
||||||
|
ed_series = getattr(ed_df['taxful_total_price'], op)(np.float32(1.879))
|
||||||
|
assert_pandas_eland_series_equal(pd_series, ed_series, check_less_precise=True)
|
||||||
|
|
||||||
pd_series = getattr(pd_df['taxful_total_price'], op)(int(8))
|
pd_series = getattr(pd_df['taxful_total_price'], op)(int(8))
|
||||||
ed_series = getattr(ed_df['taxful_total_price'], op)(int(8))
|
ed_series = getattr(ed_df['taxful_total_price'], op)(int(8))
|
||||||
assert_pandas_eland_series_equal(pd_series, ed_series, check_less_precise=True)
|
assert_pandas_eland_series_equal(pd_series, ed_series, check_less_precise=True)
|
||||||
|
|
||||||
|
def test_supported_series_dtypes_ops(self):
|
||||||
|
pd_df = self.pd_ecommerce().head(100)
|
||||||
|
ed_df = self.ed_ecommerce().head(100)
|
||||||
|
|
||||||
|
# Test some specific operations that are and aren't supported
|
||||||
|
numeric_ops = ['__add__',
|
||||||
|
'__truediv__',
|
||||||
|
'__floordiv__',
|
||||||
|
'__pow__',
|
||||||
|
'__mod__',
|
||||||
|
'__mul__',
|
||||||
|
'__sub__']
|
||||||
|
|
||||||
|
non_string_numeric_ops = ['__add__',
|
||||||
|
'__truediv__',
|
||||||
|
'__floordiv__',
|
||||||
|
'__pow__',
|
||||||
|
'__mod__',
|
||||||
|
'__sub__']
|
||||||
|
# __mul__ is supported for int * str in pandas
|
||||||
|
|
||||||
|
# float op float
|
||||||
|
for op in numeric_ops:
|
||||||
|
pd_series = getattr(pd_df['taxful_total_price'], op)(pd_df['taxless_total_price'])
|
||||||
|
ed_series = getattr(ed_df['taxful_total_price'], op)(ed_df['taxless_total_price'])
|
||||||
|
assert_pandas_eland_series_equal(pd_series, ed_series, check_less_precise=True)
|
||||||
|
|
||||||
|
# int op float
|
||||||
|
for op in numeric_ops:
|
||||||
|
pd_series = getattr(pd_df['total_quantity'], op)(pd_df['taxless_total_price'])
|
||||||
|
ed_series = getattr(ed_df['total_quantity'], op)(ed_df['taxless_total_price'])
|
||||||
|
assert_pandas_eland_series_equal(pd_series, ed_series, check_less_precise=True)
|
||||||
|
|
||||||
|
# float op int
|
||||||
|
for op in numeric_ops:
|
||||||
|
pd_series = getattr(pd_df['taxful_total_price'], op)(pd_df['total_quantity'])
|
||||||
|
ed_series = getattr(ed_df['taxful_total_price'], op)(ed_df['total_quantity'])
|
||||||
|
assert_pandas_eland_series_equal(pd_series, ed_series, check_less_precise=True)
|
||||||
|
|
||||||
|
# str op int (throws)
|
||||||
|
for op in non_string_numeric_ops:
|
||||||
|
with pytest.raises(TypeError):
|
||||||
|
pd_series = getattr(pd_df['currency'], op)(pd_df['total_quantity'])
|
||||||
|
with pytest.raises(TypeError):
|
||||||
|
ed_series = getattr(ed_df['currency'], op)(ed_df['total_quantity'])
|
||||||
|
|
||||||
|
# int op str (throws)
|
||||||
|
for op in non_string_numeric_ops:
|
||||||
|
with pytest.raises(TypeError):
|
||||||
|
pd_series = getattr(pd_df['total_quantity'], op)(pd_df['currency'])
|
||||||
|
with pytest.raises(TypeError):
|
||||||
|
ed_series = getattr(ed_df['total_quantity'], op)(ed_df['currency'])
|
||||||
|
Loading…
x
Reference in New Issue
Block a user