From c12bf9357b593a62a14c70bd5c9ae290d5f2de2d Mon Sep 17 00:00:00 2001 From: Stephen Dodson Date: Thu, 21 Nov 2019 15:39:13 +0000 Subject: [PATCH 1/9] Series rename and arithmetic initial implementation Partially implemented, tests fail with this commit. --- eland/dataframe.py | 2 +- eland/mappings.py | 54 ++--- eland/ndframe.py | 4 +- eland/operations.py | 163 ++++++++----- eland/query.py | 11 + eland/query_compiler.py | 220 +++++++++++++++--- eland/series.py | 50 +++- eland/tests/client/__init__.py | 0 eland/tests/client/test_eq_pytest.py | 28 +++ eland/tests/common.py | 4 +- .../mappings/test_aggregatables_pytest.py | 4 +- eland/tests/mappings/test_dtypes_pytest.py | 2 +- .../test_numeric_source_fields_pytest.py | 26 +-- .../query_compiler/test_rename_pytest.py | 75 ++++++ eland/tests/series/test_arithmetics_pytest.py | 50 ++++ eland/tests/series/test_rename_pytest.py | 23 ++ 16 files changed, 582 insertions(+), 134 deletions(-) create mode 100644 eland/tests/client/__init__.py create mode 100644 eland/tests/client/test_eq_pytest.py create mode 100644 eland/tests/query_compiler/test_rename_pytest.py create mode 100644 eland/tests/series/test_arithmetics_pytest.py create mode 100644 eland/tests/series/test_rename_pytest.py diff --git a/eland/dataframe.py b/eland/dataframe.py index 98d6dab..852e06a 100644 --- a/eland/dataframe.py +++ b/eland/dataframe.py @@ -43,7 +43,7 @@ class DataFrame(NDFrame): - elasticsearch-py instance or - eland.Client instance index_pattern: str - Elasticsearch index pattern (e.g. 'flights' or 'filebeat-\*') + Elasticsearch index pattern (e.g. 'flights' or 'filebeat-*') columns: list of str, optional List of DataFrame columns. A subset of the Elasticsearch index's fields. index_field: str, optional diff --git a/eland/mappings.py b/eland/mappings.py index a4457f9..48140eb 100644 --- a/eland/mappings.py +++ b/eland/mappings.py @@ -182,7 +182,7 @@ class Mappings: """ all_fields_caps_fields = all_fields_caps['fields'] - columns = ['_source', 'es_dtype', 'pd_dtype', 'searchable', 'aggregatable'] + field_names = ['_source', 'es_dtype', 'pd_dtype', 'searchable', 'aggregatable'] capability_matrix = {} for field, field_caps in all_fields_caps_fields.items(): @@ -208,7 +208,7 @@ class Mappings: format(field, vv['non_searchable_indices']), UserWarning) - capability_matrix_df = pd.DataFrame.from_dict(capability_matrix, orient='index', columns=columns) + capability_matrix_df = pd.DataFrame.from_dict(capability_matrix, orient='index', columns=field_names) return capability_matrix_df.sort_index() @@ -325,14 +325,14 @@ class Mappings: mappings = {} mappings['properties'] = {} - for column_name, dtype in dataframe.dtypes.iteritems(): - if geo_points is not None and column_name in geo_points: + for field_name_name, dtype in dataframe.dtypes.iteritems(): + if geo_points is not None and field_name_name in geo_points: es_dtype = 'geo_point' else: es_dtype = Mappings._pd_dtype_to_es_dtype(dtype) - mappings['properties'][column_name] = {} - mappings['properties'][column_name]['type'] = es_dtype + mappings['properties'][field_name_name] = {} + mappings['properties'][field_name_name]['type'] = es_dtype return {"mappings": mappings} @@ -407,12 +407,12 @@ class Mappings: return is_source_field - def aggregatable_columns(self, columns=None): + def aggregatable_field_names(self, field_names=None): """ - Return a dict of aggregatable columns from all columns or columns list + Return a dict of aggregatable field_names from all field_names or field_names list {'customer_full_name': 'customer_full_name.keyword', ...} - Logic here is that column names are '_source' fields and keyword fields + Logic here is that field_name names are '_source' fields and keyword fields may be nested beneath the field. E.g. customer_full_name: text customer_full_name.keyword: keyword @@ -424,28 +424,28 @@ class Mappings: dict e.g. {'customer_full_name': 'customer_full_name.keyword', ...} """ - if columns is None: - columns = self.source_fields() + if field_names is None: + field_names = self.source_fields() aggregatables = {} - for column in columns: - capabilities = self.field_capabilities(column) + for field_name in field_names: + capabilities = self.field_capabilities(field_name) if capabilities['aggregatable']: - aggregatables[column] = column + aggregatables[field_name] = field_name else: - # Try 'column.keyword' - column_keyword = column + '.keyword' - capabilities = self.field_capabilities(column_keyword) + # Try 'field_name.keyword' + field_name_keyword = field_name + '.keyword' + capabilities = self.field_capabilities(field_name_keyword) if capabilities['aggregatable']: - aggregatables[column_keyword] = column + aggregatables[field_name_keyword] = field_name else: # Aggregations not supported for this field - raise ValueError("Aggregations not supported for ", column) + raise ValueError("Aggregations not supported for ", field_name) return aggregatables - def numeric_source_fields(self, columns, include_bool=True): + def numeric_source_fields(self, field_names, include_bool=True): """ Returns ------- @@ -461,10 +461,10 @@ class Mappings: df = self._mappings_capabilities[(self._mappings_capabilities._source == True) & ((self._mappings_capabilities.pd_dtype == 'int64') | (self._mappings_capabilities.pd_dtype == 'float64'))] - # if columns exists, filter index with columns - if columns is not None: - # reindex adds NA for non-existing columns (non-numeric), so drop these after reindex - df = df.reindex(columns) + # if field_names exists, filter index with field_names + if field_names is not None: + # reindex adds NA for non-existing field_names (non-numeric), so drop these after reindex + df = df.reindex(field_names) df.dropna(inplace=True) # return as list @@ -488,16 +488,16 @@ class Mappings: """ return len(self.source_fields()) - def dtypes(self, columns=None): + def dtypes(self, field_names=None): """ Returns ------- dtypes: pd.Series Source field name + pd_dtype """ - if columns is not None: + if field_names is not None: return pd.Series( - {key: self._source_field_pd_dtypes[key] for key in columns}) + {key: self._source_field_pd_dtypes[key] for key in field_names}) return pd.Series(self._source_field_pd_dtypes) diff --git a/eland/ndframe.py b/eland/ndframe.py index 31a2c40..c98dd22 100644 --- a/eland/ndframe.py +++ b/eland/ndframe.py @@ -49,9 +49,7 @@ class NDFrame: A reference to a Elasticsearch python client """ if query_compiler is None: - query_compiler = ElandQueryCompiler(client=client, - index_pattern=index_pattern, - columns=columns, + query_compiler = ElandQueryCompiler(client=client, index_pattern=index_pattern, field_names=columns, index_field=index_field) self._query_compiler = query_compiler diff --git a/eland/operations.py b/eland/operations.py index 7f69446..aa9ba2f 100644 --- a/eland/operations.py +++ b/eland/operations.py @@ -12,7 +12,7 @@ class Operations: A collector of the queries and selectors we apply to queries to return the appropriate results. For example, - - a list of the columns in the DataFrame (a subset of columns in the index) + - a list of the field_names in the DataFrame (a subset of field_names in the index) - a size limit on the results (e.g. for head(n=5)) - a query to filter the results (e.g. df.A > 10) @@ -66,26 +66,34 @@ class Operations: task = ('tail', (index.sort_field, n)) self._tasks.append(task) - def set_columns(self, columns): - # Setting columns at different phases of the task list may result in different - # operations. So instead of setting columns once, set when it happens in call chain - if type(columns) is not list: - columns = list(columns) + def arithmetic_op_fields(self, field_name, op_name, left_field, right_field): + task = ('arithmetic_op_fields', (field_name, (op_name, (left_field, right_field)))) + + # Set this as a column we want to retrieve + self.set_field_names([field_name]) - # TODO - column renaming - # TODO - validate we are setting columns to a subset of last columns? - task = ('columns', columns) self._tasks.append(task) - # Iterate backwards through task list looking for last 'columns' task + + def set_field_names(self, field_names): + # Setting field_names at different phases of the task list may result in different + # operations. So instead of setting field_names once, set when it happens in call chain + if type(field_names) is not list: + field_names = list(field_names) + + # TODO - field_name renaming + # TODO - validate we are setting field_names to a subset of last field_names? + task = ('field_names', field_names) + self._tasks.append(task) + # Iterate backwards through task list looking for last 'field_names' task for task in reversed(self._tasks): - if task[0] == 'columns': + if task[0] == 'field_names': return task[1] return None - def get_columns(self): - # Iterate backwards through task list looking for last 'columns' task + def get_field_names(self): + # Iterate backwards through task list looking for last 'field_names' task for task in reversed(self._tasks): - if task[0] == 'columns': + if task[0] == 'field_names': return task[1] return None @@ -103,8 +111,8 @@ class Operations: "not supported {0} {1}" .format(query_params, post_processing)) - # Only return requested columns - fields = query_compiler.columns + # Only return requested field_names + fields = query_compiler.field_names counts = {} for field in fields: @@ -143,13 +151,13 @@ class Operations: Parameters ---------- field_types: str, default None - if `aggregatable` use only columns whose fields in elasticseach are aggregatable. + if `aggregatable` use only field_names whose fields in elasticseach are aggregatable. If `None`, use only numeric fields. Returns ------- pandas.Series - Series containing results of `func` applied to the column(s) + Series containing results of `func` applied to the field_name(s) """ query_params, post_processing = self._resolve_tasks() @@ -157,7 +165,7 @@ class Operations: if size is not None: raise NotImplementedError("Can not count field matches if size is set {}".format(size)) - columns = self.get_columns() + field_names = self.get_field_names() body = Query(query_params['query']) @@ -165,9 +173,9 @@ class Operations: # therefore we include an optional all parameter on operations # that call _metric_aggs if field_types=='aggregatable': - source_fields = query_compiler._mappings.aggregatable_columns(columns) + source_fields = query_compiler._mappings.aggregatable_field_names(field_names) else: - source_fields = query_compiler._mappings.numeric_source_fields(columns) + source_fields = query_compiler._mappings.numeric_source_fields(field_names) for field in source_fields: body.metric_aggs(field, func, field) @@ -209,7 +217,7 @@ class Operations: Returns ------- pandas.Series - Series containing results of `func` applied to the column(s) + Series containing results of `func` applied to the field_name(s) """ query_params, post_processing = self._resolve_tasks() @@ -217,14 +225,14 @@ class Operations: if size is not None: raise NotImplementedError("Can not count field matches if size is set {}".format(size)) - columns = self.get_columns() + field_names = self.get_field_names() - # Get just aggregatable columns - aggregatable_columns = query_compiler._mappings.aggregatable_columns(columns) + # Get just aggregatable field_names + aggregatable_field_names = query_compiler._mappings.aggregatable_field_names(field_names) body = Query(query_params['query']) - for field in aggregatable_columns.keys(): + for field in aggregatable_field_names.keys(): body.terms_aggs(field, func, field, es_size=es_size) response = query_compiler._client.search( @@ -234,12 +242,12 @@ class Operations: results = {} - for key, value in aggregatable_columns.items(): - for bucket in response['aggregations'][columns[0]]['buckets']: + for key, value in aggregatable_field_names.items(): + for bucket in response['aggregations'][field_names[0]]['buckets']: results[bucket['key']] = bucket['doc_count'] try: - name = columns[0] + name = field_names[0] except IndexError: name = None @@ -248,16 +256,16 @@ class Operations: return s def _hist_aggs(self, query_compiler, num_bins): - # Get histogram bins and weights for numeric columns + # Get histogram bins and weights for numeric field_names query_params, post_processing = self._resolve_tasks() size = self._size(query_params, post_processing) if size is not None: raise NotImplementedError("Can not count field matches if size is set {}".format(size)) - columns = self.get_columns() + field_names = self.get_field_names() - numeric_source_fields = query_compiler._mappings.numeric_source_fields(columns) + numeric_source_fields = query_compiler._mappings.numeric_source_fields(field_names) body = Query(query_params['query']) @@ -331,7 +339,7 @@ class Operations: Pandas supports a lot of options here, and these options generally work on text and numerics in pandas. Elasticsearch has metric aggs and terms aggs so will have different behaviour. - Pandas aggs that return columns (as opposed to transformed rows): + Pandas aggs that return field_names (as opposed to transformed rows): all any @@ -398,14 +406,14 @@ class Operations: if size is not None: raise NotImplementedError("Can not count field matches if size is set {}".format(size)) - columns = self.get_columns() + field_names = self.get_field_names() body = Query(query_params['query']) # convert pandas aggs to ES equivalent es_aggs = self._map_pd_aggs_to_es_aggs(pd_aggs) - for field in columns: + for field in field_names: for es_agg in es_aggs: # If we have multiple 'extended_stats' etc. here we simply NOOP on 2nd call if isinstance(es_agg, tuple): @@ -427,7 +435,7 @@ class Operations: """ results = {} - for field in columns: + for field in field_names: values = list() for es_agg in es_aggs: if isinstance(es_agg, tuple): @@ -448,9 +456,9 @@ class Operations: if size is not None: raise NotImplementedError("Can not count field matches if size is set {}".format(size)) - columns = self.get_columns() + field_names = self.get_field_names() - numeric_source_fields = query_compiler._mappings.numeric_source_fields(columns, include_bool=False) + numeric_source_fields = query_compiler._mappings.numeric_source_fields(field_names, include_bool=False) # for each field we compute: # count, mean, std, min, 25%, 50%, 75%, max @@ -535,10 +543,15 @@ class Operations: size, sort_params = Operations._query_params_to_size_and_sort(query_params) - body = Query(query_params['query']) + script_fields = query_params['query_script_fields'] + query = Query(query_params['query']) - # Only return requested columns - columns = self.get_columns() + body = query.to_search_body() + if script_fields is not None: + body['script_fields'] = script_fields + + # Only return requested field_names + field_names = self.get_field_names() es_results = None @@ -551,14 +564,14 @@ class Operations: index=query_compiler._index_pattern, size=size, sort=sort_params, - body=body.to_search_body(), - _source=columns) + body=body, + _source=field_names) else: is_scan = True es_results = query_compiler._client.scan( index=query_compiler._index_pattern, - query=body.to_search_body(), - _source=columns) + query=body, + _source=field_names) # create post sort if sort_params is not None: post_processing.append(self._sort_params_to_postprocessing(sort_params)) @@ -575,9 +588,9 @@ class Operations: df = self._apply_df_post_processing(df, post_processing) collector.collect(df) - def iloc(self, index, columns): - # index and columns are indexers - task = ('iloc', (index, columns)) + def iloc(self, index, field_names): + # index and field_names are indexers + task = ('iloc', (index, field_names)) self._tasks.append(task) def index_count(self, query_compiler, field): @@ -691,13 +704,13 @@ class Operations: df = df.sort_values(sort_field, False) elif action[0] == 'iloc': index_indexer = action[1][0] - column_indexer = action[1][1] + field_name_indexer = action[1][1] if index_indexer is None: index_indexer = slice(None) - if column_indexer is None: - column_indexer = slice(None) - df = df.iloc[index_indexer, column_indexer] - # columns could be in here (and we ignore it) + if field_name_indexer is None: + field_name_indexer = slice(None) + df = df.iloc[index_indexer, field_name_indexer] + # field_names could be in here (and we ignore it) return df @@ -710,6 +723,7 @@ class Operations: "query_sort_order": None, "query_size": None, "query_fields": None, + "query_script_fields": None, "query": Query()} post_processing = [] @@ -727,6 +741,8 @@ class Operations: query_params, post_processing = self._resolve_query_terms(task, query_params, post_processing) elif task[0] == 'boolean_filter': query_params, post_processing = self._resolve_boolean_filter(task, query_params, post_processing) + elif task[0] == 'arithmetic_op_fields': + query_params, post_processing = self._resolve_arithmetic_op_fields(task, query_params, post_processing) else: # a lot of operations simply post-process the dataframe - put these straight through query_params, post_processing = self._resolve_post_processing_task(task, query_params, post_processing) @@ -858,9 +874,44 @@ class Operations: return query_params, post_processing + def _resolve_arithmetic_op_fields(self, item, query_params, post_processing): + # task = ('arithmetic_op_fields', (field_name, (op_name, (left_field, right_field)))) + field_name = item[1][0] + op_name = item[1][1][0] + left_field = item[1][1][1][0] + right_field = item[1][1][1][1] + + """ + (if op_name = 'truediv') + + "script_fields": { + "field_name": { + "script": { + "source": "doc[left_field].value / doc[right_field].value" + } + } + } + """ + if op_name == 'truediv': + op = '/' + else: + raise NotImplementedError("Not implemented operation '{0}'".format(op_name)) + + source = "doc['{0}'].value {1} doc['{2}'].value".format(left_field, op, right_field) + + if query_params['query_script_fields'] is None: + query_params['query_script_fields'] = {} + query_params['query_script_fields'][field_name] = { + 'script': { + 'source': source + } + } + + return query_params, post_processing + def _resolve_post_processing_task(self, item, query_params, post_processing): # Just do this in post-processing - if item[0] != 'columns': + if item[0] != 'field_names': post_processing.append(item) return query_params, post_processing @@ -885,11 +936,11 @@ class Operations: query_params, post_processing = self._resolve_tasks() size, sort_params = Operations._query_params_to_size_and_sort(query_params) - columns = self.get_columns() + field_names = self.get_field_names() buf.write(" size: {0}\n".format(size)) buf.write(" sort_params: {0}\n".format(sort_params)) - buf.write(" columns: {0}\n".format(columns)) + buf.write(" field_names: {0}\n".format(field_names)) buf.write(" post_processing: {0}\n".format(post_processing)) def update_query(self, boolean_filter): diff --git a/eland/query.py b/eland/query.py index 80a5161..72e9129 100644 --- a/eland/query.py +++ b/eland/query.py @@ -15,10 +15,12 @@ class Query: def __init__(self, query=None): if query is None: self._query = BooleanFilter() + self._script_fields = {} self._aggs = {} else: # Deep copy the incoming query so we can change it self._query = deepcopy(query._query) + self._script_fields = deepcopy(query._script_fields) self._aggs = deepcopy(query._aggs) def exists(self, field, must=True): @@ -157,5 +159,14 @@ class Query: else: self._query = self._query & boolean_filter + def arithmetic_op_fields(self, op_name, left_field, right_field): + if self._script_fields.empty(): + body = None + else: + body = {"query": self._script_fields.build()} + + return body + def __repr__(self): return repr(self.to_search_body()) + diff --git a/eland/query_compiler.py b/eland/query_compiler.py index 102221f..e94e605 100644 --- a/eland/query_compiler.py +++ b/eland/query_compiler.py @@ -38,12 +38,8 @@ class ElandQueryCompiler: A way to mitigate this would be to post process this drop - TODO """ - def __init__(self, - client=None, - index_pattern=None, - columns=None, - index_field=None, - operations=None): + def __init__(self, client=None, index_pattern=None, field_names=None, index_field=None, operations=None, + name_mapper=None): self._client = Client(client) self._index_pattern = index_pattern @@ -58,29 +54,53 @@ class ElandQueryCompiler: else: self._operations = operations - if columns is not None: - self.columns = columns + if field_names is not None: + self.field_names = field_names + + if name_mapper is None: + self._name_mapper = ElandQueryCompiler.DisplayNameToFieldNameMapper() + else: + self._name_mapper = name_mapper def _get_index(self): return self._index + def _get_field_names(self): + field_names = self._operations.get_field_names() + if field_names is None: + # default to all + field_names = self._mappings.source_fields() + + return pd.Index(field_names) + + def _set_field_names(self, field_names): + self._operations.set_field_names(field_names) + + field_names = property(_get_field_names, _set_field_names) + def _get_columns(self): - columns = self._operations.get_columns() + columns = self._operations.get_field_names() if columns is None: # default to all columns = self._mappings.source_fields() + # map renames + columns = self._name_mapper.field_to_display_names(columns) + return pd.Index(columns) def _set_columns(self, columns): - self._operations.set_columns(columns) + # map renames + columns = self._name_mapper.display_to_field_names(columns) + + self._operations.set_field_names(columns) columns = property(_get_columns, _set_columns) index = property(_get_index) @property def dtypes(self): - columns = self._operations.get_columns() + columns = self._operations.get_field_names() return self._mappings.dtypes(columns) @@ -194,6 +214,12 @@ class ElandQueryCompiler: row = hit['_source'] + # script_fields appear in 'fields' + if 'fields' in hit: + fields = hit['fields'] + for key, value in fields.items(): + row[key] = value + # get index value - can be _id or can be field value in source if self._index.is_source_field: index_field = row[self._index.index_field] @@ -221,6 +247,10 @@ class ElandQueryCompiler: is_source_field, pd_dtype = self._mappings.source_field_pd_dtype(missing) df[missing] = pd.Series(dtype=pd_dtype) + # Rename columns + if not self._name_mapper.empty: + df.rename(columns=self._name_mapper.display_names_mapper(), inplace=True) + # Sort columns in mapping order df = df[self.columns] @@ -267,6 +297,8 @@ class ElandQueryCompiler: out[field_name].append(x) else: out[field_name] = x + else: + out[name[:-1]] = x flatten(y) @@ -307,13 +339,16 @@ class ElandQueryCompiler: return df def copy(self): - return ElandQueryCompiler( - client=self._client, - index_pattern=self._index_pattern, - columns=None, # columns are embedded in operations - index_field=self._index.index_field, - operations=self._operations.copy() - ) + return ElandQueryCompiler(client=self._client, index_pattern=self._index_pattern, field_names=None, + index_field=self._index.index_field, operations=self._operations.copy(), + name_mapper=self._name_mapper.copy()) + + def rename(self, renames): + result = self.copy() + + result._name_mapper.rename_display_name(renames) + + return result def head(self, n): result = self.copy() @@ -364,14 +399,7 @@ class ElandQueryCompiler: if numeric: raise NotImplementedError("Not implemented yet...") - result._operations.set_columns(list(key)) - - return result - - def view(self, index=None, columns=None): - result = self.copy() - - result._operations.iloc(index, columns) + result._operations.set_field_names(list(key)) return result @@ -382,7 +410,7 @@ class ElandQueryCompiler: if columns is not None: # columns is a pandas.Index so we can use pandas drop feature new_columns = self.columns.drop(columns) - result._operations.set_columns(new_columns.to_list()) + result._operations.set_field_names(new_columns.to_list()) if index is not None: result._operations.drop_index_values(self, self.index.index_field, index) @@ -433,3 +461,141 @@ class ElandQueryCompiler: return result + def check_arithmetics(self, right): + """ + Compare 2 query_compilers to see if arithmetic operations can be performed by the NDFrame object. + + This does very basic comparisons and ignores some of the complexities of incompatible task lists + + Raises exception if incompatible + + Parameters + ---------- + right: ElandQueryCompiler + The query compiler to compare self to + + Raises + ------ + TypeError, ValueError + If arithmetic operations aren't possible + """ + if not isinstance(right, ElandQueryCompiler): + raise TypeError( + "Incompatible types " + "{0} != {1}".format(type(self), type(right)) + ) + + if self._client._es != right._client._es: + raise ValueError( + "Can not perform arithmetic operations across different clients" + "{0} != {1}".format(self._client._es, right._client._es) + ) + + if self._index.index_field != right._index.index_field: + raise ValueError( + "Can not perform arithmetic operations across different index fields " + "{0} != {1}".format(self._index.index_field, right._index.index_field) + ) + + if self._index_pattern != right._index_pattern: + raise ValueError( + "Can not perform arithmetic operations across different index patterns" + "{0} != {1}".format(self._index_pattern, right._index_pattern) + ) + + def arithmetic_op_fields(self, field_name, op, left_field, right_field): + result = self.copy() + + result._operations.arithmetic_op_fields(field_name, op, left_field, right_field) + + return result + + """ + Internal class to deal with column renaming and script_fields + """ + class DisplayNameToFieldNameMapper: + def __init__(self, + field_to_display_names=None, + display_to_field_names=None): + + if field_to_display_names is not None: + self._field_to_display_names = field_to_display_names + else: + self._field_to_display_names = dict() + + if display_to_field_names is not None: + self._display_to_field_names = display_to_field_names + else: + self._display_to_field_names = dict() + + def rename_display_name(self, renames): + for current_display_name, new_display_name in renames.items(): + if current_display_name in self._display_to_field_names: + # has been renamed already - update name + field_name = self._display_to_field_names[current_display_name] + del self._display_to_field_names[current_display_name] + del self._field_to_display_names[field_name] + self._display_to_field_names[new_display_name] = field_name + self._field_to_display_names[field_name] = new_display_name + else: + # new rename - assume 'current_display_name' is 'field_name' + field_name = current_display_name + + # if field_name is already mapped ignore + if field_name not in self._field_to_display_names: + self._display_to_field_names[new_display_name] = field_name + self._field_to_display_names[field_name] = new_display_name + + def field_names_to_list(self): + return self._field_to_display_names.keys() + + def display_names_to_list(self): + return self._display_to_field_names.keys() + + # Return mapper values as dict + def display_names_mapper(self): + return self._field_to_display_names + + @property + def empty(self): + return not self._display_to_field_names + + def field_to_display_names(self, field_names): + if self.empty: + return field_names + + display_names = [] + + for field_name in field_names: + if field_name in self._field_to_display_names: + display_name = self._field_to_display_names[field_name] + else: + display_name = field_name + display_names.append(display_name) + + return display_names + + def display_to_field_names(self, display_names): + if self.empty: + return display_names + + field_names = [] + + for display_name in display_names: + if display_name in self._display_to_field_names: + field_name = self._display_to_field_names[display_name] + else: + field_name = display_name + field_names.append(field_name) + + return field_names + + def __constructor__(self, *args, **kwargs): + return type(self)(*args, **kwargs) + + def copy(self): + return self.__constructor__( + field_to_display_names=self._field_to_display_names, + display_to_field_names = self._display_to_field_names + ) + diff --git a/eland/series.py b/eland/series.py index 6318028..9c96003 100644 --- a/eland/series.py +++ b/eland/series.py @@ -15,7 +15,7 @@ Based on NDFrame which underpins eland.1DataFrame """ -import warnings +from io import StringIO import pandas as pd @@ -98,6 +98,20 @@ class Series(NDFrame): name = property(_get_name) + def rename(self, new_name): + """ + ONLY COLUMN rename supported + + Parameters + ---------- + new_name + + Returns + ------- + + """ + return Series(query_compiler=self._query_compiler.rename({self.name: new_name})) + def head(self, n=5): return Series(query_compiler=self._query_compiler.head(n)) @@ -141,7 +155,7 @@ class Series(NDFrame): """ if not isinstance(es_size, int): raise TypeError("es_size must be a positive integer.") - if not es_size>0: + if not es_size > 0: raise ValueError("es_size must be a positive integer.") return self._query_compiler.value_counts(es_size) @@ -276,3 +290,35 @@ class Series(NDFrame): """ return 1 + + def info_es(self): + buf = StringIO() + + super()._info_es(buf) + + return buf.getvalue() + + def __truediv__(self, right): + return self.truediv(right) + + def truediv(self, right): + """ + return a / b + + a & b == Series + a & b must share same eland.Client, index_pattern and index_field + """ + if isinstance(right, Series): + # Check compatibility + self._query_compiler.check_arithmetics(right._query_compiler) + + field_name = "{0}_{1}_{2}".format(self.name, "truediv", right.name) + + # Compatible, so create new Series + return Series(query_compiler=self._query_compiler.arithmetic_op_fields( + field_name, 'truediv', self.name, right.name)) + else: + raise TypeError( + "Can only perform arithmetic operation on selected types " + "{0} != {1}".format(type(self), type(right)) + ) diff --git a/eland/tests/client/__init__.py b/eland/tests/client/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/eland/tests/client/test_eq_pytest.py b/eland/tests/client/test_eq_pytest.py new file mode 100644 index 0000000..332d6f4 --- /dev/null +++ b/eland/tests/client/test_eq_pytest.py @@ -0,0 +1,28 @@ +# File called _pytest for PyCharm compatability +from elasticsearch import Elasticsearch + +import eland as ed +from eland.tests.common import TestData + +import pytest + + +class TestClientEq(TestData): + + def test_self_eq(self): + es = Elasticsearch('localhost') + + client = ed.Client(es) + + assert client != es + + assert client == client + + def test_non_self_ne(self): + es1 = Elasticsearch('localhost') + es2 = Elasticsearch('localhost') + + client1 = ed.Client(es1) + client2 = ed.Client(es2) + + assert client1 != client2 diff --git a/eland/tests/common.py b/eland/tests/common.py index 107b5ab..6549d73 100644 --- a/eland/tests/common.py +++ b/eland/tests/common.py @@ -80,7 +80,7 @@ def assert_eland_frame_equal(left, right): assert_frame_equal(left._to_pandas(), right._to_pandas()) -def assert_pandas_eland_series_equal(left, right): +def assert_pandas_eland_series_equal(left, right, check_less_precise=False): if not isinstance(left, pd.Series): raise AssertionError("Expected type {exp_type}, found {act_type} instead".format( exp_type='pd.Series', act_type=type(left))) @@ -90,4 +90,4 @@ def assert_pandas_eland_series_equal(left, right): exp_type='ed.Series', act_type=type(right))) # Use pandas tests to check similarity - assert_series_equal(left, right._to_pandas()) + assert_series_equal(left, right._to_pandas(), check_less_precise=check_less_precise) diff --git a/eland/tests/mappings/test_aggregatables_pytest.py b/eland/tests/mappings/test_aggregatables_pytest.py index 8d12f17..9d27ba7 100644 --- a/eland/tests/mappings/test_aggregatables_pytest.py +++ b/eland/tests/mappings/test_aggregatables_pytest.py @@ -8,7 +8,7 @@ class TestMappingsAggregatables(TestData): def test_ecommerce_all_aggregatables(self): ed_ecommerce = self.ed_ecommerce() - aggregatables = ed_ecommerce._query_compiler._mappings.aggregatable_columns() + aggregatables = ed_ecommerce._query_compiler._mappings.aggregatable_field_names() expected = {'category.keyword': 'category', 'currency': 'currency', @@ -67,6 +67,6 @@ class TestMappingsAggregatables(TestData): 'customer_first_name.keyword': 'customer_first_name', 'type': 'type', 'user': 'user'} - aggregatables = ed_ecommerce._query_compiler._mappings.aggregatable_columns(expected.values()) + aggregatables = ed_ecommerce._query_compiler._mappings.aggregatable_field_names(expected.values()) assert expected == aggregatables diff --git a/eland/tests/mappings/test_dtypes_pytest.py b/eland/tests/mappings/test_dtypes_pytest.py index 43d3e3e..e467e9a 100644 --- a/eland/tests/mappings/test_dtypes_pytest.py +++ b/eland/tests/mappings/test_dtypes_pytest.py @@ -21,6 +21,6 @@ class TestMappingsDtypes(TestData): pd_flights = self.pd_flights()[['Carrier', 'AvgTicketPrice', 'Cancelled']] pd_dtypes = pd_flights.dtypes - ed_dtypes = ed_flights._query_compiler._mappings.dtypes(columns=['Carrier', 'AvgTicketPrice', 'Cancelled']) + ed_dtypes = ed_flights._query_compiler._mappings.dtypes(field_names=['Carrier', 'AvgTicketPrice', 'Cancelled']) assert_series_equal(pd_dtypes, ed_dtypes) diff --git a/eland/tests/mappings/test_numeric_source_fields_pytest.py b/eland/tests/mappings/test_numeric_source_fields_pytest.py index 9611a1f..a63e94d 100644 --- a/eland/tests/mappings/test_numeric_source_fields_pytest.py +++ b/eland/tests/mappings/test_numeric_source_fields_pytest.py @@ -13,13 +13,13 @@ class TestMappingsNumericSourceFields(TestData): ed_flights = self.ed_flights() pd_flights = self.pd_flights() - ed_numeric = ed_flights._query_compiler._mappings.numeric_source_fields(columns=None, include_bool=False) + ed_numeric = ed_flights._query_compiler._mappings.numeric_source_fields(field_names=None, include_bool=False) pd_numeric = pd_flights.select_dtypes(include=np.number) assert pd_numeric.columns.to_list() == ed_numeric def test_ecommerce_selected_non_numeric_source_fields(self): - columns = ['category', 'currency', 'customer_birth_date', 'customer_first_name', 'user'] + field_names = ['category', 'currency', 'customer_birth_date', 'customer_first_name', 'user'] """ Note: non of there are numeric category object @@ -29,16 +29,16 @@ class TestMappingsNumericSourceFields(TestData): user object """ - ed_ecommerce = self.ed_ecommerce()[columns] - pd_ecommerce = self.pd_ecommerce()[columns] + ed_ecommerce = self.ed_ecommerce()[field_names] + pd_ecommerce = self.pd_ecommerce()[field_names] - ed_numeric = ed_ecommerce._query_compiler._mappings.numeric_source_fields(columns=columns, include_bool=False) + ed_numeric = ed_ecommerce._query_compiler._mappings.numeric_source_fields(field_names=field_names, include_bool=False) pd_numeric = pd_ecommerce.select_dtypes(include=np.number) assert pd_numeric.columns.to_list() == ed_numeric def test_ecommerce_selected_mixed_numeric_source_fields(self): - columns = ['category', 'currency', 'customer_birth_date', 'customer_first_name', 'total_quantity', 'user'] + field_names = ['category', 'currency', 'customer_birth_date', 'customer_first_name', 'total_quantity', 'user'] """ Note: one is numeric @@ -50,16 +50,16 @@ class TestMappingsNumericSourceFields(TestData): user object """ - ed_ecommerce = self.ed_ecommerce()[columns] - pd_ecommerce = self.pd_ecommerce()[columns] + ed_ecommerce = self.ed_ecommerce()[field_names] + pd_ecommerce = self.pd_ecommerce()[field_names] - ed_numeric = ed_ecommerce._query_compiler._mappings.numeric_source_fields(columns=columns, include_bool=False) + ed_numeric = ed_ecommerce._query_compiler._mappings.numeric_source_fields(field_names=field_names, include_bool=False) pd_numeric = pd_ecommerce.select_dtypes(include=np.number) assert pd_numeric.columns.to_list() == ed_numeric def test_ecommerce_selected_all_numeric_source_fields(self): - columns = ['total_quantity', 'taxful_total_price', 'taxless_total_price'] + field_names = ['total_quantity', 'taxful_total_price', 'taxless_total_price'] """ Note: all are numeric @@ -68,10 +68,10 @@ class TestMappingsNumericSourceFields(TestData): taxless_total_price float64 """ - ed_ecommerce = self.ed_ecommerce()[columns] - pd_ecommerce = self.pd_ecommerce()[columns] + ed_ecommerce = self.ed_ecommerce()[field_names] + pd_ecommerce = self.pd_ecommerce()[field_names] - ed_numeric = ed_ecommerce._query_compiler._mappings.numeric_source_fields(columns=columns, include_bool=False) + ed_numeric = ed_ecommerce._query_compiler._mappings.numeric_source_fields(field_names=field_names, include_bool=False) pd_numeric = pd_ecommerce.select_dtypes(include=np.number) assert pd_numeric.columns.to_list() == ed_numeric diff --git a/eland/tests/query_compiler/test_rename_pytest.py b/eland/tests/query_compiler/test_rename_pytest.py new file mode 100644 index 0000000..3948044 --- /dev/null +++ b/eland/tests/query_compiler/test_rename_pytest.py @@ -0,0 +1,75 @@ +# File called _pytest for PyCharm compatability +import pandas as pd + +from pandas.util.testing import assert_series_equal + +from eland import ElandQueryCompiler +from eland.tests.common import TestData + + +class TestQueryCompilerRename(TestData): + + def test_query_compiler_basic_rename(self): + field_names = [] + display_names = [] + + mapper = ElandQueryCompiler.DisplayNameToFieldNameMapper() + + assert field_names == mapper.field_names_to_list() + assert display_names == mapper.display_names_to_list() + + field_names = ['a'] + display_names = ['A'] + update_A = {'a' : 'A'} + mapper.rename_display_name(update_A) + + assert field_names == mapper.field_names_to_list() + assert display_names == mapper.display_names_to_list() + + field_names = ['a', 'b'] + display_names = ['A', 'B'] + + update_B = {'b' : 'B'} + mapper.rename_display_name(update_B) + + assert field_names == mapper.field_names_to_list() + assert display_names == mapper.display_names_to_list() + + field_names = ['a', 'b'] + display_names = ['AA', 'B'] + + update_AA = {'A' : 'AA'} + mapper.rename_display_name(update_AA) + + assert field_names == mapper.field_names_to_list() + assert display_names == mapper.display_names_to_list() + + def test_query_compiler_basic_rename_columns(self): + columns = ['a', 'b', 'c', 'd'] + + mapper = ElandQueryCompiler.DisplayNameToFieldNameMapper() + + display_names = ['A', 'b', 'c', 'd'] + update_A = {'a' : 'A'} + mapper.rename_display_name(update_A) + + assert display_names == mapper.display_names(columns) + + # Invalid update + display_names = ['A', 'b', 'c', 'd'] + update_ZZ = {'a' : 'ZZ'} + mapper.rename_display_name(update_ZZ) + + assert display_names == mapper.display_names(columns) + + display_names = ['AA', 'b', 'c', 'd'] + update_AA = {'A' : 'AA'} # already renamed to 'A' + mapper.rename_display_name(update_AA) + + assert display_names == mapper.display_names(columns) + + display_names = ['AA', 'b', 'C', 'd'] + update_AA_C = {'a' : 'AA', 'c' : 'C'} # 'a' rename ignored + mapper.rename_display_name(update_AA_C) + + assert display_names == mapper.display_names(columns) diff --git a/eland/tests/series/test_arithmetics_pytest.py b/eland/tests/series/test_arithmetics_pytest.py new file mode 100644 index 0000000..589853c --- /dev/null +++ b/eland/tests/series/test_arithmetics_pytest.py @@ -0,0 +1,50 @@ +# File called _pytest for PyCharm compatability +import eland as ed +from eland.tests.common import TestData, assert_pandas_eland_series_equal +from pandas.util.testing import assert_series_equal +import pytest + + +class TestSeriesArithmetics(TestData): + + def test_ecommerce_series_invalid_div(self): + pd_df = self.pd_ecommerce() + ed_df = self.ed_ecommerce() + + # eland / pandas == error + with pytest.raises(TypeError): + ed_df['total_quantity'] / pd_df['taxful_total_price'] + + def test_ecommerce_series_div(self): + pd_df = self.pd_ecommerce() + ed_df = self.ed_ecommerce() + + pd_avg_price = pd_df['total_quantity'] / pd_df['taxful_total_price'] + print(pd_avg_price) # this has None as name + + ed_avg_price = ed_df['total_quantity'] / ed_df['taxful_total_price'] + print(ed_avg_price) + + assert_pandas_eland_series_equal(pd_avg_price, ed_avg_price, check_less_precise=True) + + def test_ecommerce_series_div_float(self): + pd_df = self.pd_ecommerce() + ed_df = self.ed_ecommerce() + + pd_avg_price = pd_df['total_quantity'] / 10.0 + print(pd_avg_price) + + ed_avg_price = ed_df['total_quantity'] / 10.0 + print(ed_avg_price) + + def test_ecommerce_series_div_other(self): + ed_df = self.ed_ecommerce() + + ed_s1 = ed_df.total_quantity + ed_s2 = ed_df.taxful_total_price + + print(ed_s1) + print(ed_s2) + + print(ed_s1) + print(ed_s2) diff --git a/eland/tests/series/test_rename_pytest.py b/eland/tests/series/test_rename_pytest.py new file mode 100644 index 0000000..89eb7f7 --- /dev/null +++ b/eland/tests/series/test_rename_pytest.py @@ -0,0 +1,23 @@ +# File called _pytest for PyCharm compatability +import eland as ed +from eland.tests import ELASTICSEARCH_HOST +from eland.tests import FLIGHTS_INDEX_NAME +from eland.tests.common import TestData +from eland.tests.common import assert_pandas_eland_series_equal + + +class TestSeriesRename(TestData): + + def test_rename(self): + pd_carrier = self.pd_flights()['Carrier'] + ed_carrier = ed.Series(ELASTICSEARCH_HOST, FLIGHTS_INDEX_NAME, 'Carrier') + + assert_pandas_eland_series_equal(pd_carrier, ed_carrier) + + pd_renamed = pd_carrier.rename("renamed") + ed_renamed = ed_carrier.rename("renamed") + + assert_pandas_eland_series_equal(pd_renamed, ed_renamed) + + + From 5d119215f8e24b542ee1a7b3f6ea9906e71cfbf0 Mon Sep 17 00:00:00 2001 From: Stephen Dodson Date: Thu, 21 Nov 2019 20:37:54 +0000 Subject: [PATCH 2/9] Fixing rename and truediv issues tests pass TODO - implement additional orithmetic ops --- eland/dataframe.py | 4 +- eland/operations.py | 69 +++++++++++++------ eland/query_compiler.py | 34 +++++---- eland/series.py | 66 ++++++++++++++++-- .../query_compiler/test_rename_pytest.py | 8 +-- eland/tests/series/test_arithmetics_pytest.py | 21 ++---- eland/tests/series/test_name_pytest.py | 32 +++++++++ 7 files changed, 172 insertions(+), 62 deletions(-) create mode 100644 eland/tests/series/test_name_pytest.py diff --git a/eland/dataframe.py b/eland/dataframe.py index 852e06a..51373ed 100644 --- a/eland/dataframe.py +++ b/eland/dataframe.py @@ -389,10 +389,10 @@ class DataFrame(NDFrame): [27 rows x 5 columns] Operations: - tasks: [('boolean_filter', {'bool': {'must': [{'term': {'OriginAirportID': 'AMS'}}, {'range': {'FlightDelayMin': {'gt': 60}}}]}}), ('columns', ['timestamp', 'OriginAirportID', 'DestAirportID', 'FlightDelayMin']), ('tail', ('_doc', 5))] + tasks: [('boolean_filter', {'bool': {'must': [{'term': {'OriginAirportID': 'AMS'}}, {'range': {'FlightDelayMin': {'gt': 60}}}]}}), ('field_names', ['timestamp', 'OriginAirportID', 'DestAirportID', 'FlightDelayMin']), ('tail', ('_doc', 5))] size: 5 sort_params: _doc:desc - columns: ['timestamp', 'OriginAirportID', 'DestAirportID', 'FlightDelayMin'] + field_names: ['timestamp', 'OriginAirportID', 'DestAirportID', 'FlightDelayMin'] post_processing: ['sort_index'] """ diff --git a/eland/operations.py b/eland/operations.py index aa9ba2f..5c40778 100644 --- a/eland/operations.py +++ b/eland/operations.py @@ -881,31 +881,58 @@ class Operations: left_field = item[1][1][1][0] right_field = item[1][1][1][1] - """ - (if op_name = 'truediv') - - "script_fields": { - "field_name": { - "script": { - "source": "doc[left_field].value / doc[right_field].value" - } + if isinstance(right_field, str): + """ + (if op_name = 'truediv') + + "script_fields": { + "field_name": { + "script": { + "source": "doc[left_field].value / doc[right_field].value" + } + } + } + """ + if op_name == 'truediv': + op = '/' + else: + raise NotImplementedError("Not implemented operation '{0}'".format(op_name)) + + source = "doc['{0}'].value {1} doc['{2}'].value".format(left_field, op, right_field) + + if query_params['query_script_fields'] is None: + query_params['query_script_fields'] = {} + query_params['query_script_fields'][field_name] = { + 'script': { + 'source': source + } } - } - """ - if op_name == 'truediv': - op = '/' else: - raise NotImplementedError("Not implemented operation '{0}'".format(op_name)) + """ + (if op_name = 'truediv') - source = "doc['{0}'].value {1} doc['{2}'].value".format(left_field, op, right_field) - - if query_params['query_script_fields'] is None: - query_params['query_script_fields'] = {} - query_params['query_script_fields'][field_name] = { - 'script': { - 'source': source + "script_fields": { + "field_name": { + "script": { + "source": "doc[left_field].value / right_field" + } + } + } + """ + if op_name == 'truediv': + op = '/' + else: + raise NotImplementedError("Not implemented operation '{0}'".format(op_name)) + + source = "doc['{0}'].value {1} {2}".format(left_field, op, right_field) + + if query_params['query_script_fields'] is None: + query_params['query_script_fields'] = {} + query_params['query_script_fields'][field_name] = { + 'script': { + 'source': source + } } - } return query_params, post_processing diff --git a/eland/query_compiler.py b/eland/query_compiler.py index e94e605..ef01756 100644 --- a/eland/query_compiler.py +++ b/eland/query_compiler.py @@ -96,6 +96,7 @@ class ElandQueryCompiler: self._operations.set_field_names(columns) columns = property(_get_columns, _set_columns) + index = property(_get_index) @property @@ -241,9 +242,9 @@ class ElandQueryCompiler: # _source may not contain all columns in the mapping # therefore, fill in missing columns # (note this returns self.columns NOT IN df.columns) - missing_columns = list(set(self.columns) - set(df.columns)) + missing_field_names = list(set(self.field_names) - set(df.columns)) - for missing in missing_columns: + for missing in missing_field_names: is_source_field, pd_dtype = self._mappings.source_field_pd_dtype(missing) df[missing] = pd.Series(dtype=pd_dtype) @@ -252,7 +253,8 @@ class ElandQueryCompiler: df.rename(columns=self._name_mapper.display_names_mapper(), inplace=True) # Sort columns in mapping order - df = df[self.columns] + if len(self.columns) > 1: + df = df[self.columns] return partial_result, df @@ -343,12 +345,14 @@ class ElandQueryCompiler: index_field=self._index.index_field, operations=self._operations.copy(), name_mapper=self._name_mapper.copy()) - def rename(self, renames): - result = self.copy() - - result._name_mapper.rename_display_name(renames) - - return result + def rename(self, renames, inplace=False): + if inplace: + self._name_mapper.rename_display_name(renames) + return self + else: + result = self.copy() + result._name_mapper.rename_display_name(renames) + return result def head(self, n): result = self.copy() @@ -503,10 +507,10 @@ class ElandQueryCompiler: "{0} != {1}".format(self._index_pattern, right._index_pattern) ) - def arithmetic_op_fields(self, field_name, op, left_field, right_field): + def arithmetic_op_fields(self, new_field_name, op, left_field, right_field): result = self.copy() - result._operations.arithmetic_op_fields(field_name, op, left_field, right_field) + result._operations.arithmetic_op_fields(new_field_name, op, left_field, right_field) return result @@ -547,10 +551,10 @@ class ElandQueryCompiler: self._field_to_display_names[field_name] = new_display_name def field_names_to_list(self): - return self._field_to_display_names.keys() + return sorted(list(self._field_to_display_names.keys())) def display_names_to_list(self): - return self._display_to_field_names.keys() + return sorted(list(self._display_to_field_names.keys())) # Return mapper values as dict def display_names_mapper(self): @@ -595,7 +599,7 @@ class ElandQueryCompiler: def copy(self): return self.__constructor__( - field_to_display_names=self._field_to_display_names, - display_to_field_names = self._display_to_field_names + field_to_display_names=self._field_to_display_names.copy(), + display_to_field_names = self._display_to_field_names.copy() ) diff --git a/eland/series.py b/eland/series.py index 9c96003..4918b2d 100644 --- a/eland/series.py +++ b/eland/series.py @@ -18,6 +18,7 @@ Based on NDFrame which underpins eland.1DataFrame from io import StringIO import pandas as pd +import numpy as np from eland import NDFrame from eland.filter import NotFilter, Equal, Greater, Less, GreaterEqual, LessEqual, ScriptFilter, IsIn @@ -96,19 +97,58 @@ class Series(NDFrame): def _get_name(self): return self._query_compiler.columns[0] - name = property(_get_name) + def _set_name(self, name): + self._query_compiler.rename({self.name: name}, inplace=True) + + name = property(_get_name, _set_name) def rename(self, new_name): """ - ONLY COLUMN rename supported + Rename name of series. Only column rename is supported. This does not change the underlying + Elasticsearch index, but adds a soft link from the new name (column) to the Elasticsearch field name Parameters ---------- - new_name + new_name: str Returns ------- + eland.Series + eland.Series with new name. + See Also + -------- + :pandas_api_docs:pandas.Series.rename + + Examples + -------- + >>> df = ed.DataFrame('localhost', 'flights') + >>> df.Carrier + 0 Kibana Airlines + 1 Logstash Airways + 2 Logstash Airways + 3 Kibana Airlines + 4 Kibana Airlines + ... + 13054 Logstash Airways + 13055 Logstash Airways + 13056 Logstash Airways + 13057 JetBeats + 13058 JetBeats + Name: Carrier, Length: 13059, dtype: object + >>> df.Carrier.rename('Airline') + 0 Kibana Airlines + 1 Logstash Airways + 2 Logstash Airways + 3 Kibana Airlines + 4 Kibana Airlines + ... + 13054 Logstash Airways + 13055 Logstash Airways + 13056 Logstash Airways + 13057 JetBeats + 13058 JetBeats + Name: Airline, Length: 13059, dtype: object """ return Series(query_compiler=self._query_compiler.rename({self.name: new_name})) @@ -312,11 +352,25 @@ class Series(NDFrame): # Check compatibility self._query_compiler.check_arithmetics(right._query_compiler) - field_name = "{0}_{1}_{2}".format(self.name, "truediv", right.name) + new_field_name = "{0}_{1}_{2}".format(self.name, "truediv", right.name) # Compatible, so create new Series - return Series(query_compiler=self._query_compiler.arithmetic_op_fields( - field_name, 'truediv', self.name, right.name)) + series = Series(query_compiler=self._query_compiler.arithmetic_op_fields( + new_field_name, 'truediv', self.name, right.name)) + series.name = None + + return series + elif isinstance(right, (int, float)): # TODO extend to numpy types + new_field_name = "{0}_{1}_{2}".format(self.name, "truediv", str(right).replace('.','_')) + + # Compatible, so create new Series + series = Series(query_compiler=self._query_compiler.arithmetic_op_fields( + new_field_name, 'truediv', self.name, float(right))) # force rhs to float + + # name of Series remains original name + series.name = self.name + + return series else: raise TypeError( "Can only perform arithmetic operation on selected types " diff --git a/eland/tests/query_compiler/test_rename_pytest.py b/eland/tests/query_compiler/test_rename_pytest.py index 3948044..40f0534 100644 --- a/eland/tests/query_compiler/test_rename_pytest.py +++ b/eland/tests/query_compiler/test_rename_pytest.py @@ -53,23 +53,23 @@ class TestQueryCompilerRename(TestData): update_A = {'a' : 'A'} mapper.rename_display_name(update_A) - assert display_names == mapper.display_names(columns) + assert display_names == mapper.field_to_display_names(columns) # Invalid update display_names = ['A', 'b', 'c', 'd'] update_ZZ = {'a' : 'ZZ'} mapper.rename_display_name(update_ZZ) - assert display_names == mapper.display_names(columns) + assert display_names == mapper.field_to_display_names(columns) display_names = ['AA', 'b', 'c', 'd'] update_AA = {'A' : 'AA'} # already renamed to 'A' mapper.rename_display_name(update_AA) - assert display_names == mapper.display_names(columns) + assert display_names == mapper.field_to_display_names(columns) display_names = ['AA', 'b', 'C', 'd'] update_AA_C = {'a' : 'AA', 'c' : 'C'} # 'a' rename ignored mapper.rename_display_name(update_AA_C) - assert display_names == mapper.display_names(columns) + assert display_names == mapper.field_to_display_names(columns) diff --git a/eland/tests/series/test_arithmetics_pytest.py b/eland/tests/series/test_arithmetics_pytest.py index 589853c..5a510ea 100644 --- a/eland/tests/series/test_arithmetics_pytest.py +++ b/eland/tests/series/test_arithmetics_pytest.py @@ -20,10 +20,7 @@ class TestSeriesArithmetics(TestData): ed_df = self.ed_ecommerce() pd_avg_price = pd_df['total_quantity'] / pd_df['taxful_total_price'] - print(pd_avg_price) # this has None as name - ed_avg_price = ed_df['total_quantity'] / ed_df['taxful_total_price'] - print(ed_avg_price) assert_pandas_eland_series_equal(pd_avg_price, ed_avg_price, check_less_precise=True) @@ -32,19 +29,15 @@ class TestSeriesArithmetics(TestData): ed_df = self.ed_ecommerce() pd_avg_price = pd_df['total_quantity'] / 10.0 - print(pd_avg_price) - ed_avg_price = ed_df['total_quantity'] / 10.0 - print(ed_avg_price) - def test_ecommerce_series_div_other(self): + assert_pandas_eland_series_equal(pd_avg_price, ed_avg_price, check_less_precise=True) + + def test_ecommerce_series_div_int(self): + pd_df = self.pd_ecommerce() ed_df = self.ed_ecommerce() - ed_s1 = ed_df.total_quantity - ed_s2 = ed_df.taxful_total_price + pd_avg_price = pd_df['total_quantity'] / int(10) + ed_avg_price = ed_df['total_quantity'] / int(10) - print(ed_s1) - print(ed_s2) - - print(ed_s1) - print(ed_s2) + assert_pandas_eland_series_equal(pd_avg_price, ed_avg_price, check_less_precise=True) diff --git a/eland/tests/series/test_name_pytest.py b/eland/tests/series/test_name_pytest.py new file mode 100644 index 0000000..5e757a8 --- /dev/null +++ b/eland/tests/series/test_name_pytest.py @@ -0,0 +1,32 @@ +# File called _pytest for PyCharm compatability +import eland as ed +from eland.tests import ELASTICSEARCH_HOST +from eland.tests import FLIGHTS_INDEX_NAME +from eland.tests.common import TestData +from eland.tests.common import assert_pandas_eland_series_equal + + +class TestSeriesName(TestData): + + def test_name(self): + # deep copy pandas DataFrame as .name alters this reference frame + pd_series = self.pd_flights()['Carrier'].copy(deep=True) + ed_series = ed.Series(ELASTICSEARCH_HOST, FLIGHTS_INDEX_NAME, 'Carrier') + + assert_pandas_eland_series_equal(pd_series, ed_series) + assert ed_series.name == pd_series.name + + pd_series.name = "renamed1" + ed_series.name = "renamed1" + + assert_pandas_eland_series_equal(pd_series, ed_series) + assert ed_series.name == pd_series.name + + pd_series.name = "renamed2" + ed_series.name = "renamed2" + + assert_pandas_eland_series_equal(pd_series, ed_series) + assert ed_series.name == pd_series.name + + + From 84e23ab5d1ee3956648b7bb9fb1ec0cb9e17389f Mon Sep 17 00:00:00 2001 From: Stephen Dodson Date: Fri, 22 Nov 2019 15:44:55 +0000 Subject: [PATCH 3/9] Added Series metric aggs + Series docs Also, improved Series.to_string() --- .../source/reference/api/eland.Series.add.rst | 6 + .../reference/api/eland.Series.describe.rst | 6 + .../source/reference/api/eland.Series.div.rst | 6 + .../reference/api/eland.Series.empty.rst | 6 + .../reference/api/eland.Series.floordiv.rst | 6 + .../reference/api/eland.Series.head.rst | 6 + .../reference/api/eland.Series.index.rst | 6 + .../source/reference/api/eland.Series.max.rst | 6 + .../reference/api/eland.Series.mean.rst | 6 + .../source/reference/api/eland.Series.min.rst | 6 + .../source/reference/api/eland.Series.mod.rst | 6 + .../source/reference/api/eland.Series.mul.rst | 6 + .../reference/api/eland.Series.name.rst | 6 + .../reference/api/eland.Series.nunique.rst | 6 + .../source/reference/api/eland.Series.pow.rst | 6 + .../reference/api/eland.Series.rename.rst | 6 + docs/source/reference/api/eland.Series.rst | 6 + .../reference/api/eland.Series.shape.rst | 6 + .../source/reference/api/eland.Series.sub.rst | 6 + .../source/reference/api/eland.Series.sum.rst | 6 + .../reference/api/eland.Series.tail.rst | 6 + .../reference/api/eland.Series.to_string.rst | 6 + .../reference/api/eland.Series.truediv.rst | 6 + .../api/eland.Series.value_counts.rst | 2 +- docs/source/reference/dataframe.rst | 2 - docs/source/reference/series.rst | 70 +- eland/__init__.py | 1 + eland/common.py | 8 + eland/dataframe.py | 62 +- eland/ndframe.py | 11 +- eland/operations.py | 40 +- eland/query_compiler.py | 6 +- eland/series.py | 620 ++++++++++++++++-- eland/tests/__init__.py | 4 +- eland/tests/series/test_arithmetics_pytest.py | 50 +- eland/tests/series/test_info_es_pytest.py | 17 + eland/tests/series/test_metrics_pytest.py | 44 ++ eland/tests/series/test_repr_pytest.py | 14 +- 38 files changed, 973 insertions(+), 116 deletions(-) create mode 100644 docs/source/reference/api/eland.Series.add.rst create mode 100644 docs/source/reference/api/eland.Series.describe.rst create mode 100644 docs/source/reference/api/eland.Series.div.rst create mode 100644 docs/source/reference/api/eland.Series.empty.rst create mode 100644 docs/source/reference/api/eland.Series.floordiv.rst create mode 100644 docs/source/reference/api/eland.Series.head.rst create mode 100644 docs/source/reference/api/eland.Series.index.rst create mode 100644 docs/source/reference/api/eland.Series.max.rst create mode 100644 docs/source/reference/api/eland.Series.mean.rst create mode 100644 docs/source/reference/api/eland.Series.min.rst create mode 100644 docs/source/reference/api/eland.Series.mod.rst create mode 100644 docs/source/reference/api/eland.Series.mul.rst create mode 100644 docs/source/reference/api/eland.Series.name.rst create mode 100644 docs/source/reference/api/eland.Series.nunique.rst create mode 100644 docs/source/reference/api/eland.Series.pow.rst create mode 100644 docs/source/reference/api/eland.Series.rename.rst create mode 100644 docs/source/reference/api/eland.Series.rst create mode 100644 docs/source/reference/api/eland.Series.shape.rst create mode 100644 docs/source/reference/api/eland.Series.sub.rst create mode 100644 docs/source/reference/api/eland.Series.sum.rst create mode 100644 docs/source/reference/api/eland.Series.tail.rst create mode 100644 docs/source/reference/api/eland.Series.to_string.rst create mode 100644 docs/source/reference/api/eland.Series.truediv.rst create mode 100644 eland/common.py create mode 100644 eland/tests/series/test_info_es_pytest.py create mode 100644 eland/tests/series/test_metrics_pytest.py diff --git a/docs/source/reference/api/eland.Series.add.rst b/docs/source/reference/api/eland.Series.add.rst new file mode 100644 index 0000000..da552b7 --- /dev/null +++ b/docs/source/reference/api/eland.Series.add.rst @@ -0,0 +1,6 @@ +eland.Series.add +================ + +.. currentmodule:: eland + +.. automethod:: Series.add diff --git a/docs/source/reference/api/eland.Series.describe.rst b/docs/source/reference/api/eland.Series.describe.rst new file mode 100644 index 0000000..195c410 --- /dev/null +++ b/docs/source/reference/api/eland.Series.describe.rst @@ -0,0 +1,6 @@ +eland.Series.describe +===================== + +.. currentmodule:: eland + +.. automethod:: Series.describe diff --git a/docs/source/reference/api/eland.Series.div.rst b/docs/source/reference/api/eland.Series.div.rst new file mode 100644 index 0000000..0d9698b --- /dev/null +++ b/docs/source/reference/api/eland.Series.div.rst @@ -0,0 +1,6 @@ +eland.Series.div +================ + +.. currentmodule:: eland + +.. automethod:: Series.div diff --git a/docs/source/reference/api/eland.Series.empty.rst b/docs/source/reference/api/eland.Series.empty.rst new file mode 100644 index 0000000..6ca71ec --- /dev/null +++ b/docs/source/reference/api/eland.Series.empty.rst @@ -0,0 +1,6 @@ +eland.Series.empty +================== + +.. currentmodule:: eland + +.. autoattribute:: Series.empty diff --git a/docs/source/reference/api/eland.Series.floordiv.rst b/docs/source/reference/api/eland.Series.floordiv.rst new file mode 100644 index 0000000..543f47d --- /dev/null +++ b/docs/source/reference/api/eland.Series.floordiv.rst @@ -0,0 +1,6 @@ +eland.Series.floordiv +===================== + +.. currentmodule:: eland + +.. automethod:: Series.floordiv diff --git a/docs/source/reference/api/eland.Series.head.rst b/docs/source/reference/api/eland.Series.head.rst new file mode 100644 index 0000000..78bcdbb --- /dev/null +++ b/docs/source/reference/api/eland.Series.head.rst @@ -0,0 +1,6 @@ +eland.Series.head +================= + +.. currentmodule:: eland + +.. automethod:: Series.head diff --git a/docs/source/reference/api/eland.Series.index.rst b/docs/source/reference/api/eland.Series.index.rst new file mode 100644 index 0000000..e996294 --- /dev/null +++ b/docs/source/reference/api/eland.Series.index.rst @@ -0,0 +1,6 @@ +eland.Series.index +================== + +.. currentmodule:: eland + +.. autoattribute:: Series.index diff --git a/docs/source/reference/api/eland.Series.max.rst b/docs/source/reference/api/eland.Series.max.rst new file mode 100644 index 0000000..8deec8a --- /dev/null +++ b/docs/source/reference/api/eland.Series.max.rst @@ -0,0 +1,6 @@ +eland.Series.max +================ + +.. currentmodule:: eland + +.. automethod:: Series.max diff --git a/docs/source/reference/api/eland.Series.mean.rst b/docs/source/reference/api/eland.Series.mean.rst new file mode 100644 index 0000000..5d5f2de --- /dev/null +++ b/docs/source/reference/api/eland.Series.mean.rst @@ -0,0 +1,6 @@ +eland.Series.mean +================= + +.. currentmodule:: eland + +.. automethod:: Series.mean diff --git a/docs/source/reference/api/eland.Series.min.rst b/docs/source/reference/api/eland.Series.min.rst new file mode 100644 index 0000000..484e077 --- /dev/null +++ b/docs/source/reference/api/eland.Series.min.rst @@ -0,0 +1,6 @@ +eland.Series.min +================ + +.. currentmodule:: eland + +.. automethod:: Series.min diff --git a/docs/source/reference/api/eland.Series.mod.rst b/docs/source/reference/api/eland.Series.mod.rst new file mode 100644 index 0000000..2d63164 --- /dev/null +++ b/docs/source/reference/api/eland.Series.mod.rst @@ -0,0 +1,6 @@ +eland.Series.mod +================ + +.. currentmodule:: eland + +.. automethod:: Series.mod diff --git a/docs/source/reference/api/eland.Series.mul.rst b/docs/source/reference/api/eland.Series.mul.rst new file mode 100644 index 0000000..91b0c4f --- /dev/null +++ b/docs/source/reference/api/eland.Series.mul.rst @@ -0,0 +1,6 @@ +eland.Series.mul +================ + +.. currentmodule:: eland + +.. automethod:: Series.mul diff --git a/docs/source/reference/api/eland.Series.name.rst b/docs/source/reference/api/eland.Series.name.rst new file mode 100644 index 0000000..a4086ab --- /dev/null +++ b/docs/source/reference/api/eland.Series.name.rst @@ -0,0 +1,6 @@ +eland.Series.name +================= + +.. currentmodule:: eland + +.. autoattribute:: Series.name diff --git a/docs/source/reference/api/eland.Series.nunique.rst b/docs/source/reference/api/eland.Series.nunique.rst new file mode 100644 index 0000000..40e75ea --- /dev/null +++ b/docs/source/reference/api/eland.Series.nunique.rst @@ -0,0 +1,6 @@ +eland.Series.nunique +==================== + +.. currentmodule:: eland + +.. automethod:: Series.nunique diff --git a/docs/source/reference/api/eland.Series.pow.rst b/docs/source/reference/api/eland.Series.pow.rst new file mode 100644 index 0000000..858e518 --- /dev/null +++ b/docs/source/reference/api/eland.Series.pow.rst @@ -0,0 +1,6 @@ +eland.Series.pow +================ + +.. currentmodule:: eland + +.. automethod:: Series.pow diff --git a/docs/source/reference/api/eland.Series.rename.rst b/docs/source/reference/api/eland.Series.rename.rst new file mode 100644 index 0000000..19e38ac --- /dev/null +++ b/docs/source/reference/api/eland.Series.rename.rst @@ -0,0 +1,6 @@ +eland.Series.rename +=================== + +.. currentmodule:: eland + +.. automethod:: Series.rename diff --git a/docs/source/reference/api/eland.Series.rst b/docs/source/reference/api/eland.Series.rst new file mode 100644 index 0000000..451bfc9 --- /dev/null +++ b/docs/source/reference/api/eland.Series.rst @@ -0,0 +1,6 @@ +eland.Series +============ + +.. currentmodule:: eland + +.. autoclass:: Series diff --git a/docs/source/reference/api/eland.Series.shape.rst b/docs/source/reference/api/eland.Series.shape.rst new file mode 100644 index 0000000..fe1a581 --- /dev/null +++ b/docs/source/reference/api/eland.Series.shape.rst @@ -0,0 +1,6 @@ +eland.Series.shape +================== + +.. currentmodule:: eland + +.. autoattribute:: Series.shape diff --git a/docs/source/reference/api/eland.Series.sub.rst b/docs/source/reference/api/eland.Series.sub.rst new file mode 100644 index 0000000..e2d0a21 --- /dev/null +++ b/docs/source/reference/api/eland.Series.sub.rst @@ -0,0 +1,6 @@ +eland.Series.sub +================ + +.. currentmodule:: eland + +.. automethod:: Series.sub diff --git a/docs/source/reference/api/eland.Series.sum.rst b/docs/source/reference/api/eland.Series.sum.rst new file mode 100644 index 0000000..28ef324 --- /dev/null +++ b/docs/source/reference/api/eland.Series.sum.rst @@ -0,0 +1,6 @@ +eland.Series.sum +================ + +.. currentmodule:: eland + +.. automethod:: Series.sum diff --git a/docs/source/reference/api/eland.Series.tail.rst b/docs/source/reference/api/eland.Series.tail.rst new file mode 100644 index 0000000..109fd8a --- /dev/null +++ b/docs/source/reference/api/eland.Series.tail.rst @@ -0,0 +1,6 @@ +eland.Series.tail +================= + +.. currentmodule:: eland + +.. automethod:: Series.tail diff --git a/docs/source/reference/api/eland.Series.to_string.rst b/docs/source/reference/api/eland.Series.to_string.rst new file mode 100644 index 0000000..ed1ac41 --- /dev/null +++ b/docs/source/reference/api/eland.Series.to_string.rst @@ -0,0 +1,6 @@ +eland.Series.to_string +====================== + +.. currentmodule:: eland + +.. automethod:: Series.to_string diff --git a/docs/source/reference/api/eland.Series.truediv.rst b/docs/source/reference/api/eland.Series.truediv.rst new file mode 100644 index 0000000..f89cf08 --- /dev/null +++ b/docs/source/reference/api/eland.Series.truediv.rst @@ -0,0 +1,6 @@ +eland.Series.truediv +==================== + +.. currentmodule:: eland + +.. automethod:: Series.truediv diff --git a/docs/source/reference/api/eland.Series.value_counts.rst b/docs/source/reference/api/eland.Series.value_counts.rst index 8d020b0..930d9db 100644 --- a/docs/source/reference/api/eland.Series.value_counts.rst +++ b/docs/source/reference/api/eland.Series.value_counts.rst @@ -1,5 +1,5 @@ eland.Series.value_counts -=========================== +========================= .. currentmodule:: eland diff --git a/docs/source/reference/dataframe.rst b/docs/source/reference/dataframe.rst index e1e71fa..64f5b29 100644 --- a/docs/source/reference/dataframe.rst +++ b/docs/source/reference/dataframe.rst @@ -91,5 +91,3 @@ Elasticsearch utilities :toctree: api/ DataFrame.info_es - - diff --git a/docs/source/reference/series.rst b/docs/source/reference/series.rst index cbc8898..366e57a 100644 --- a/docs/source/reference/series.rst +++ b/docs/source/reference/series.rst @@ -5,9 +5,77 @@ Series ========= .. currentmodule:: eland +Constructor +~~~~~~~~~~~ +.. autosummary:: + :toctree: api/ + + Series + +Attributes and underlying data +~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ +**Axes** + +.. autosummary:: + :toctree: api/ + + Series.index + Series.shape + Series.name + Series.empty + +Indexing, iteration +~~~~~~~~~~~~~~~~~~~ +.. autosummary:: + :toctree: api/ + + Series.head + Series.tail + +Binary operator functions +~~~~~~~~~~~~~~~~~~~~~~~~~ +.. autosummary:: + :toctree: api/ + + Series.add + Series.sub + Series.mul + Series.div + Series.truediv + Series.floordiv + Series.mod + Series.pow + Computations / descriptive stats ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ .. autosummary:: :toctree: api/ - Series.value_counts \ No newline at end of file + Series.describe + Series.max + Series.mean + Series.min + Series.sum + Series.nunique + Series.value_counts + +Reindexing / selection / label manipulation +~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ +.. autosummary:: + :toctree: api/ + + Series.rename + +Serialization / IO / conversion +~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ +.. autosummary:: + :toctree: api/ + + Series.to_string + +Elasticsearch utilities +~~~~~~~~~~~~~~~~~~~~~~~ +.. autosummary:: + :toctree: api/ + + Series.info_es diff --git a/eland/__init__.py b/eland/__init__.py index 79b89f9..699e880 100644 --- a/eland/__init__.py +++ b/eland/__init__.py @@ -1,5 +1,6 @@ from __future__ import absolute_import +from eland.common import * from eland.client import * from eland.filter import * from eland.index import * diff --git a/eland/common.py b/eland/common.py new file mode 100644 index 0000000..ff36d08 --- /dev/null +++ b/eland/common.py @@ -0,0 +1,8 @@ +# Default number of rows displayed (different to pandas where ALL could be displayed) +DEFAULT_NUM_ROWS_DISPLAYED = 60 + +def docstring_parameter(*sub): + def dec(obj): + obj.__doc__ = obj.__doc__.format(*sub) + return obj + return dec diff --git a/eland/dataframe.py b/eland/dataframe.py index 51373ed..53a7f76 100644 --- a/eland/dataframe.py +++ b/eland/dataframe.py @@ -18,15 +18,7 @@ import eland.plotting as gfx from eland import NDFrame from eland import Series from eland.filter import BooleanFilter, ScriptFilter - -# Default number of rows displayed (different to pandas where ALL could be displayed) -DEFAULT_NUM_ROWS_DISPLAYED = 60 - -def docstring_parameter(*sub): - def dec(obj): - obj.__doc__ = obj.__doc__.format(*sub) - return obj - return dec +from eland.common import DEFAULT_NUM_ROWS_DISPLAYED, docstring_parameter class DataFrame(NDFrame): @@ -43,7 +35,7 @@ class DataFrame(NDFrame): - elasticsearch-py instance or - eland.Client instance index_pattern: str - Elasticsearch index pattern (e.g. 'flights' or 'filebeat-*') + Elasticsearch index pattern (e.g. 'flights' or 'filebeat-\*') columns: list of str, optional List of DataFrame columns. A subset of the Elasticsearch index's fields. index_field: str, optional @@ -98,7 +90,6 @@ class DataFrame(NDFrame): [5 rows x 2 columns] """ - def __init__(self, client=None, index_pattern=None, @@ -586,7 +577,7 @@ class DataFrame(NDFrame): max_rows = 1 # Create a slightly bigger dataframe than display - df = self._build_repr_df(max_rows + 1, max_cols) + df = self._build_repr(max_rows + 1) if buf is not None: _buf = _expand_user(_stringify_path(buf)) @@ -651,7 +642,7 @@ class DataFrame(NDFrame): max_rows = 1 # Create a slightly bigger dataframe than display - df = self._build_repr_df(max_rows + 1, max_cols) + df = self._build_repr(max_rows + 1) if buf is not None: _buf = _expand_user(_stringify_path(buf)) @@ -1064,3 +1055,48 @@ class DataFrame(NDFrame): return self._getitem(key) else: return default + + @property + def values(self): + """ + Not implemented. + + In pandas this returns a Numpy representation of the DataFrame. This would involve scan/scrolling the + entire index. + + If this is required, call ``ed.eland_to_pandas(ed_df).values``, _but beware this will scan/scroll the entire + Elasticsearch index(s) into memory_ + + See Also + -------- + :pandas_api_docs:`pandas.DataFrame.values` + + Examples + -------- + >>> ed_df = ed.DataFrame('localhost', 'flights', columns=['AvgTicketPrice', 'Carrier']).head(5) + >>> pd_df = ed.eland_to_pandas(ed_df) + >>> print("type(ed_df)={0}\\ntype(pd_df)={1}".format(type(ed_df), type(pd_df))) + type(ed_df)= + type(pd_df)= + >>> ed_df + AvgTicketPrice Carrier + 0 841.265642 Kibana Airlines + 1 882.982662 Logstash Airways + 2 190.636904 Logstash Airways + 3 181.694216 Kibana Airlines + 4 730.041778 Kibana Airlines + + [5 rows x 2 columns] + >>> pd_df.values + array([[841.2656419677076, 'Kibana Airlines'], + [882.9826615595518, 'Logstash Airways'], + [190.6369038508356, 'Logstash Airways'], + [181.69421554118, 'Kibana Airlines'], + [730.041778346198, 'Kibana Airlines']], dtype=object) + """ + raise NotImplementedError( + "This method would scan/scroll the entire Elasticsearch index(s) into memory." + "If this is explicitly required and there is sufficient memory, call `ed.eland_to_pandas(ed_df).values`" + ) + + to_numpy = values diff --git a/eland/ndframe.py b/eland/ndframe.py index c98dd22..3abfed6 100644 --- a/eland/ndframe.py +++ b/eland/ndframe.py @@ -31,7 +31,6 @@ from pandas.util._validators import validate_bool_kwarg from eland import ElandQueryCompiler - class NDFrame: def __init__(self, @@ -65,6 +64,7 @@ class NDFrame: See Also -------- :pandas_api_docs:`pandas.DataFrame.index` + :pandas_api_docs:`pandas.Series.index` Examples -------- @@ -72,6 +72,10 @@ class NDFrame: >>> assert isinstance(df.index, ed.Index) >>> df.index.index_field '_id' + >>> s = df['Carrier'] + >>> assert isinstance(s.index, ed.Index) + >>> s.index.index_field + '_id' """ return self._query_compiler.index @@ -104,9 +108,8 @@ class NDFrame: """ return self._query_compiler.dtypes - def _build_repr_df(self, num_rows, num_cols): - # Overriden version of BasePandasDataset._build_repr_df - # to avoid issues with concat + def _build_repr(self, num_rows): + # self could be Series or DataFrame if len(self.index) <= num_rows: return self._to_pandas() diff --git a/eland/operations.py b/eland/operations.py index 5c40778..20dfe14 100644 --- a/eland/operations.py +++ b/eland/operations.py @@ -588,6 +588,7 @@ class Operations: df = self._apply_df_post_processing(df, post_processing) collector.collect(df) + def iloc(self, index, field_names): # index and field_names are indexers task = ('iloc', (index, field_names)) @@ -881,9 +882,10 @@ class Operations: left_field = item[1][1][1][0] right_field = item[1][1][1][1] + # https://www.elastic.co/guide/en/elasticsearch/painless/current/painless-api-reference-shared-java-lang.html#painless-api-reference-shared-Math if isinstance(right_field, str): """ - (if op_name = 'truediv') + (if op_name = '__truediv__') "script_fields": { "field_name": { @@ -893,12 +895,23 @@ class Operations: } } """ - if op_name == 'truediv': - op = '/' + if op_name == '__add__': + source = "doc['{0}'].value + doc['{1}'].value".format(left_field, right_field) + elif op_name == '__truediv__': + source = "doc['{0}'].value / doc['{1}'].value".format(left_field, right_field) + elif op_name == '__floordiv__': + source = "Math.floor(doc['{0}'].value / doc['{1}'].value)".format(left_field, right_field) + elif op_name == '__pow__': + source = "Math.pow(doc['{0}'].value, doc['{1}'].value)".format(left_field, right_field) + elif op_name == '__mod__': + source = "doc['{0}'].value % doc['{1}'].value".format(left_field, right_field) + elif op_name == '__mul__': + source = "doc['{0}'].value * doc['{1}'].value".format(left_field, right_field) + elif op_name == '__sub__': + source = "doc['{0}'].value - doc['{1}'].value".format(left_field, right_field) else: raise NotImplementedError("Not implemented operation '{0}'".format(op_name)) - source = "doc['{0}'].value {1} doc['{2}'].value".format(left_field, op, right_field) if query_params['query_script_fields'] is None: query_params['query_script_fields'] = {} @@ -909,7 +922,7 @@ class Operations: } else: """ - (if op_name = 'truediv') + (if op_name = '__truediv__') "script_fields": { "field_name": { @@ -919,12 +932,23 @@ class Operations: } } """ - if op_name == 'truediv': - op = '/' + if op_name == '__add__': + source = "doc['{0}'].value + {1}".format(left_field, right_field) + elif op_name == '__truediv__': + source = "doc['{0}'].value / {1}".format(left_field, right_field) + elif op_name == '__floordiv__': + source = "Math.floor(doc['{0}'].value / {1})".format(left_field, right_field) + elif op_name == '__pow__': + source = "Math.pow(doc['{0}'].value, {1})".format(left_field, right_field) + elif op_name == '__mod__': + source = "doc['{0}'].value % {1}".format(left_field, right_field) + elif op_name == '__mul__': + source = "doc['{0}'].value * {1}".format(left_field, right_field) + elif op_name == '__sub__': + source = "doc['{0}'].value - {1}".format(left_field, right_field) else: raise NotImplementedError("Not implemented operation '{0}'".format(op_name)) - source = "doc['{0}'].value {1} {2}".format(left_field, op, right_field) if query_params['query_script_fields'] is None: query_params['query_script_fields'] = {} diff --git a/eland/query_compiler.py b/eland/query_compiler.py index ef01756..e057807 100644 --- a/eland/query_compiler.py +++ b/eland/query_compiler.py @@ -239,9 +239,9 @@ class ElandQueryCompiler: # Create pandas DataFrame df = pd.DataFrame(data=rows, index=index) - # _source may not contain all columns in the mapping - # therefore, fill in missing columns - # (note this returns self.columns NOT IN df.columns) + # _source may not contain all field_names in the mapping + # therefore, fill in missing field_names + # (note this returns self.field_names NOT IN df.columns) missing_field_names = list(set(self.field_names) - set(df.columns)) for missing in missing_field_names: diff --git a/eland/series.py b/eland/series.py index 4918b2d..4e39e85 100644 --- a/eland/series.py +++ b/eland/series.py @@ -11,19 +11,26 @@ without storing the dataset in local memory. Implementation Details ---------------------- -Based on NDFrame which underpins eland.1DataFrame +Based on NDFrame which underpins eland.DataFrame """ +import sys +import warnings from io import StringIO import pandas as pd -import numpy as np +from pandas.io.common import _expand_user, _stringify_path from eland import NDFrame +from eland.common import DEFAULT_NUM_ROWS_DISPLAYED from eland.filter import NotFilter, Equal, Greater, Less, GreaterEqual, LessEqual, ScriptFilter, IsIn +def _get_method_name(): + return sys._getframe(1).f_code.co_name + + class Series(NDFrame): """ pandas.Series like API that proxies into Elasticsearch index(es). @@ -34,35 +41,35 @@ class Series(NDFrame): A reference to a Elasticsearch python client index_pattern : str - An Elasticsearch index pattern. This can contain wildcards (e.g. filebeat-*). + An Elasticsearch index pattern. This can contain wildcards (e.g. filebeat-\*\). index_field : str The field to base the series on - See Also - -------- - - Examples - -------- - - import eland as ed - client = ed.Client(Elasticsearch()) - s = ed.DataFrame(client, 'reviews', 'date') - df.head() - reviewerId vendorId rating date - 0 0 0 5 2006-04-07 17:08 - 1 1 1 5 2006-05-04 12:16 - 2 2 2 4 2006-04-21 12:26 - 3 3 3 5 2006-04-18 15:48 - 4 3 4 5 2006-04-18 15:49 - - Notice that the types are based on Elasticsearch mappings - Notes ----- If the Elasticsearch index is deleted or index mappings are changed after this object is created, the object is not rebuilt and so inconsistencies can occur. + See Also + -------- + :pandas_api_docs:`pandas.Series` + + Examples + -------- + >>> ed.Series(client='localhost', index_pattern='flights', name='Carrier') + 0 Kibana Airlines + 1 Logstash Airways + 2 Logstash Airways + 3 Kibana Airlines + 4 Kibana Airlines + ... + 13054 Logstash Airways + 13055 Logstash Airways + 13056 Logstash Airways + 13057 JetBeats + 13058 JetBeats + Name: Carrier, Length: 13059, dtype: object """ def __init__(self, @@ -94,6 +101,34 @@ class Series(NDFrame): """ return len(self.index) == 0 + @property + def shape(self): + """ + Return a tuple representing the dimensionality of the Series. + + Returns + ------- + shape: tuple + + 0. number of rows + 1. number of columns + + Notes + ----- + - number of rows ``len(series)`` queries Elasticsearch + - number of columns == 1 + + Examples + -------- + >>> df = ed.Series('localhost', 'ecommerce', name='total_quantity') + >>> df.shape + (4675, 1) + """ + num_rows = len(self) + num_columns = 1 + + return num_rows, num_columns + def _get_name(self): return self._query_compiler.columns[0] @@ -118,7 +153,7 @@ class Series(NDFrame): See Also -------- - :pandas_api_docs:pandas.Series.rename + :pandas_api_docs:`pandas.Series.rename` Examples -------- @@ -200,12 +235,39 @@ class Series(NDFrame): return self._query_compiler.value_counts(es_size) + # dtype not implemented for Series as causes query to fail + # in pandas.core.computation.ops.Term.type + # ---------------------------------------------------------------------- # Rendering Methods def __repr__(self): - num_rows = pd.get_option("max_rows") or 60 + """ + Return a string representation for a particular Series. + """ + buf = StringIO() - return self.to_string(max_rows=num_rows) + # max_rows and max_cols determine the maximum size of the pretty printed tabular + # representation of the series. pandas defaults are 60 and 20 respectively. + # series where len(series) > max_rows shows a truncated view with 10 rows shown. + max_rows = pd.get_option("display.max_rows") + min_rows = pd.get_option("display.min_rows") + + if len(self) > max_rows: + max_rows = min_rows + + show_dimensions = pd.get_option("display.show_dimensions") + + self.to_string( + buf=buf, + name=self.name, + dtype=True, + min_rows=min_rows, + max_rows=max_rows, + length=show_dimensions, + ) + result = buf.getvalue() + + return result def to_string( self, @@ -217,33 +279,69 @@ class Series(NDFrame): length=False, dtype=False, name=False, - max_rows=None): - - if max_rows is None: + max_rows=None, + min_rows=None, + ): + # In pandas calling 'to_string' without max_rows set, will dump ALL rows - we avoid this + # by limiting rows by default. + num_rows = len(self) # avoid multiple calls + if num_rows <= DEFAULT_NUM_ROWS_DISPLAYED: + if max_rows is None: + max_rows = num_rows + else: + max_rows = min(num_rows, max_rows) + elif max_rows is None: warnings.warn("Series.to_string called without max_rows set " "- this will return entire index results. " - "Setting max_rows=60, overwrite if different behaviour is required.") - max_rows = 60 + "Setting max_rows={default}" + " overwrite if different behaviour is required." + .format(default=DEFAULT_NUM_ROWS_DISPLAYED), + UserWarning) + max_rows = DEFAULT_NUM_ROWS_DISPLAYED + + # because of the way pandas handles max_rows=0, not having this throws an error + # see eland issue #56 + if max_rows == 0: + max_rows = 1 # Create a slightly bigger dataframe than display - temp_df = self._build_repr_df(max_rows + 1, None) - if isinstance(temp_df, pd.DataFrame): - temp_df = temp_df[self.name] - temp_str = repr(temp_df) - if self.name is not None: - name_str = "Name: {}, ".format(str(self.name)) + temp_series = self._build_repr(max_rows + 1) + + if buf is not None: + _buf = _expand_user(_stringify_path(buf)) else: - name_str = "" - if len(self.index) > max_rows: - len_str = "Length: {}, ".format(len(self.index)) - else: - len_str = "" - dtype_str = "dtype: {}".format(temp_str.rsplit("dtype: ", 1)[-1]) - if len(self) == 0: - return "Series([], {}{}".format(name_str, dtype_str) - return temp_str.rsplit("\nName:", 1)[0] + "\n{}{}{}".format( - name_str, len_str, dtype_str - ) + _buf = StringIO() + + # Create repr of fake series without name, length, dtype summary + temp_str = temp_series.to_string(buf=_buf, + na_rep=na_rep, + float_format=float_format, + header=header, + index=index, + length=False, + dtype=False, + name=False, + max_rows=max_rows) + + # Create the summary + footer = "" + if name and self.name is not None: + footer += "Name: {}".format(str(self.name)) + if length and len(self) > max_rows: + if footer: + footer += ", " + footer += "Length: {}".format(len(self.index)) + if dtype: + if footer: + footer += ", " + footer += "dtype: {}".format(temp_series.dtype) + + if len(footer) > 0: + _buf.write("\n{}".format(footer)) + + if buf is None: + result = _buf.getvalue() + return result def _to_pandas(self): return self._query_compiler.to_pandas()[self.name] @@ -321,13 +419,16 @@ class Series(NDFrame): @property def ndim(self): """ - Returns 1 by definition of a Series1 + Returns 1 by definition of a Series Returns ------- int By definition 1 + See Also + -------- + :pandas_api_docs:`pandas.Series.ndim` """ return 1 @@ -338,34 +439,317 @@ class Series(NDFrame): return buf.getvalue() - def __truediv__(self, right): - return self.truediv(right) - - def truediv(self, right): + def __add__(self, right): """ - return a / b + Return addition of series and right, element-wise (binary operator add). + + Parameters + ---------- + right: eland.Series + + Returns + ------- + eland.Series + + Examples + -------- + >>> df = ed.DataFrame('localhost', 'ecommerce').head(5) + >>> df.taxful_total_price + 0 36.98 + 1 53.98 + 2 199.98 + 3 174.98 + 4 80.98 + Name: taxful_total_price, dtype: float64 + >>> df.total_quantity + 0 2 + 1 2 + 2 2 + 3 2 + 4 2 + Name: total_quantity, dtype: int64 + >>> df.taxful_total_price + df.total_quantity + 0 38.980000 + 1 55.980000 + 2 201.979996 + 3 176.979996 + 4 82.980003 + dtype: float64 + """ + return self._numeric_op(right, _get_method_name()) + + def __truediv__(self, right): + """ + Return floating division of series and right, element-wise (binary operator truediv). + + Parameters + ---------- + right: eland.Series + + Returns + ------- + eland.Series + + Examples + -------- + >>> df = ed.DataFrame('localhost', 'ecommerce').head(5) + >>> df.taxful_total_price + 0 36.98 + 1 53.98 + 2 199.98 + 3 174.98 + 4 80.98 + Name: taxful_total_price, dtype: float64 + >>> df.total_quantity + 0 2 + 1 2 + 2 2 + 3 2 + 4 2 + Name: total_quantity, dtype: int64 + >>> df.taxful_total_price / df.total_quantity + 0 18.490000 + 1 26.990000 + 2 99.989998 + 3 87.489998 + 4 40.490002 + dtype: float64 + """ + return self._numeric_op(right, _get_method_name()) + + def __floordiv__(self, right): + """ + Return integer division of series and right, element-wise (binary operator floordiv //). + + Parameters + ---------- + right: eland.Series + + Returns + ------- + eland.Series + + Examples + -------- + >>> df = ed.DataFrame('localhost', 'ecommerce').head(5) + >>> df.taxful_total_price + 0 36.98 + 1 53.98 + 2 199.98 + 3 174.98 + 4 80.98 + Name: taxful_total_price, dtype: float64 + >>> df.total_quantity + 0 2 + 1 2 + 2 2 + 3 2 + 4 2 + Name: total_quantity, dtype: int64 + >>> df.taxful_total_price // df.total_quantity + 0 18.0 + 1 26.0 + 2 99.0 + 3 87.0 + 4 40.0 + dtype: float64 + """ + return self._numeric_op(right, _get_method_name()) + + def __mod__(self, right): + """ + Return modulo of series and right, element-wise (binary operator mod %). + + Parameters + ---------- + right: eland.Series + + Returns + ------- + eland.Series + + Examples + -------- + >>> df = ed.DataFrame('localhost', 'ecommerce').head(5) + >>> df.taxful_total_price + 0 36.98 + 1 53.98 + 2 199.98 + 3 174.98 + 4 80.98 + Name: taxful_total_price, dtype: float64 + >>> df.total_quantity + 0 2 + 1 2 + 2 2 + 3 2 + 4 2 + Name: total_quantity, dtype: int64 + >>> df.taxful_total_price % df.total_quantity + 0 0.980000 + 1 1.980000 + 2 1.979996 + 3 0.979996 + 4 0.980003 + dtype: float64 + """ + return self._numeric_op(right, _get_method_name()) + + def __mul__(self, right): + """ + Return multiplication of series and right, element-wise (binary operator mul). + + Parameters + ---------- + right: eland.Series + + Returns + ------- + eland.Series + + Examples + -------- + >>> df = ed.DataFrame('localhost', 'ecommerce').head(5) + >>> df.taxful_total_price + 0 36.98 + 1 53.98 + 2 199.98 + 3 174.98 + 4 80.98 + Name: taxful_total_price, dtype: float64 + >>> df.total_quantity + 0 2 + 1 2 + 2 2 + 3 2 + 4 2 + Name: total_quantity, dtype: int64 + >>> df.taxful_total_price * df.total_quantity + 0 73.959999 + 1 107.959999 + 2 399.959991 + 3 349.959991 + 4 161.960007 + dtype: float64 + """ + return self._numeric_op(right, _get_method_name()) + + def __sub__(self, right): + """ + Return subtraction of series and right, element-wise (binary operator sub). + + Parameters + ---------- + right: eland.Series + + Returns + ------- + eland.Series + + Examples + -------- + >>> df = ed.DataFrame('localhost', 'ecommerce').head(5) + >>> df.taxful_total_price + 0 36.98 + 1 53.98 + 2 199.98 + 3 174.98 + 4 80.98 + Name: taxful_total_price, dtype: float64 + >>> df.total_quantity + 0 2 + 1 2 + 2 2 + 3 2 + 4 2 + Name: total_quantity, dtype: int64 + >>> df.taxful_total_price - df.total_quantity + 0 34.980000 + 1 51.980000 + 2 197.979996 + 3 172.979996 + 4 78.980003 + dtype: float64 + """ + return self._numeric_op(right, _get_method_name()) + + def __pow__(self, right): + """ + Return exponential power of series and right, element-wise (binary operator pow \**\). + + Parameters + ---------- + right: eland.Series + + Returns + ------- + eland.Series + + Examples + -------- + >>> df = ed.DataFrame('localhost', 'ecommerce').head(5) + >>> df.taxful_total_price + 0 36.98 + 1 53.98 + 2 199.98 + 3 174.98 + 4 80.98 + Name: taxful_total_price, dtype: float64 + >>> df.total_quantity + 0 2 + 1 2 + 2 2 + 3 2 + 4 2 + Name: total_quantity, dtype: int64 + >>> df.taxful_total_price ** df.total_quantity + 0 1367.520366 + 1 2913.840351 + 2 39991.998691 + 3 30617.998905 + 4 6557.760944 + dtype: float64 + """ + return self._numeric_op(right, _get_method_name()) + + add = __add__ + div = __truediv__ + divide = __truediv__ + floordiv = __floordiv__ + mod = __mod__ + mul = __mul__ + multiply = __mul__ + pow = __pow__ + sub = __sub__ + subtract = __sub__ + truediv = __truediv__ + + def _numeric_op(self, right, method_name): + """ + return a op b a & b == Series a & b must share same eland.Client, index_pattern and index_field + a == Series, b == numeric """ if isinstance(right, Series): # Check compatibility self._query_compiler.check_arithmetics(right._query_compiler) - new_field_name = "{0}_{1}_{2}".format(self.name, "truediv", right.name) + new_field_name = "{0}_{1}_{2}".format(self.name, method_name, right.name) # Compatible, so create new Series series = Series(query_compiler=self._query_compiler.arithmetic_op_fields( - new_field_name, 'truediv', self.name, right.name)) + new_field_name, method_name, self.name, right.name)) series.name = None return series - elif isinstance(right, (int, float)): # TODO extend to numpy types - new_field_name = "{0}_{1}_{2}".format(self.name, "truediv", str(right).replace('.','_')) + elif isinstance(right, (int, float)): # TODO extend to numpy types + new_field_name = "{0}_{1}_{2}".format(self.name, method_name, str(right).replace('.', '_')) # Compatible, so create new Series series = Series(query_compiler=self._query_compiler.arithmetic_op_fields( - new_field_name, 'truediv', self.name, float(right))) # force rhs to float + new_field_name, method_name, self.name, float(right))) # force rhs to float # name of Series remains original name series.name = self.name @@ -374,5 +758,123 @@ class Series(NDFrame): else: raise TypeError( "Can only perform arithmetic operation on selected types " - "{0} != {1}".format(type(self), type(right)) + "{0} != {1} for {2}".format(type(self), type(right), method_name) ) + + def max(self): + """ + Return the maximum of the Series values + + TODO - implement remainder of pandas arguments, currently non-numerics are not supported + + Returns + ------- + float + max value + + See Also + -------- + :pandas_api_docs:`pandas.Series.max` + + Examples + -------- + >>> s = ed.Series('localhost', 'flights', name='AvgTicketPrice') + >>> int(s.max()) + 1199 + """ + results = super().max() + return results.squeeze() + + def mean(self): + """ + Return the mean of the Series values + + TODO - implement remainder of pandas arguments, currently non-numerics are not supported + + Returns + ------- + float + max value + + See Also + -------- + :pandas_api_docs:`pandas.Series.mean` + + Examples + -------- + >>> s = ed.Series('localhost', 'flights', name='AvgTicketPrice') + >>> int(s.mean()) + 628 + """ + results = super().mean() + return results.squeeze() + + def min(self): + """ + Return the minimum of the Series values + + TODO - implement remainder of pandas arguments, currently non-numerics are not supported + + Returns + ------- + float + max value + + See Also + -------- + :pandas_api_docs:`pandas.Series.min` + + Examples + -------- + >>> s = ed.Series('localhost', 'flights', name='AvgTicketPrice') + >>> int(s.min()) + 100 + """ + results = super().min() + return results.squeeze() + + def sum(self): + """ + Return the sum of the Series values + + TODO - implement remainder of pandas arguments, currently non-numerics are not supported + + Returns + ------- + float + max value + + See Also + -------- + :pandas_api_docs:`pandas.Series.sum` + + Examples + -------- + >>> s = ed.Series('localhost', 'flights', name='AvgTicketPrice') + >>> int(s.sum()) + 8204364 + """ + results = super().sum() + return results.squeeze() + + def nunique(self): + """ + Return the sum of the Series values + + Returns + ------- + float + max value + + See Also + -------- + :pandas_api_docs:`pandas.Series.sum` + + Examples + -------- + >>> s = ed.Series('localhost', 'flights', name='Carrier') + >>> s.nunique() + 4 + """ + results = super().nunique() + return results.squeeze() diff --git a/eland/tests/__init__.py b/eland/tests/__init__.py index f5dfb16..4791380 100644 --- a/eland/tests/__init__.py +++ b/eland/tests/__init__.py @@ -279,10 +279,10 @@ ECOMMERCE_MAPPING = {"mappings": { "type": "keyword" }, "taxful_total_price": { - "type": "half_float" + "type": "float" }, "taxless_total_price": { - "type": "half_float" + "type": "float" }, "total_quantity": { "type": "integer" diff --git a/eland/tests/series/test_arithmetics_pytest.py b/eland/tests/series/test_arithmetics_pytest.py index 5a510ea..d13595f 100644 --- a/eland/tests/series/test_arithmetics_pytest.py +++ b/eland/tests/series/test_arithmetics_pytest.py @@ -4,6 +4,8 @@ from eland.tests.common import TestData, assert_pandas_eland_series_equal from pandas.util.testing import assert_series_equal import pytest +import numpy as np + class TestSeriesArithmetics(TestData): @@ -15,29 +17,35 @@ class TestSeriesArithmetics(TestData): with pytest.raises(TypeError): ed_df['total_quantity'] / pd_df['taxful_total_price'] - def test_ecommerce_series_div(self): - pd_df = self.pd_ecommerce() - ed_df = self.ed_ecommerce() + def test_ecommerce_series_basic_arithmetics(self): + pd_df = self.pd_ecommerce().head(100) + ed_df = self.ed_ecommerce().head(100) - pd_avg_price = pd_df['total_quantity'] / pd_df['taxful_total_price'] - ed_avg_price = ed_df['total_quantity'] / ed_df['taxful_total_price'] + ops = ['__add__', + '__truediv__', + '__floordiv__', + '__pow__', + '__mod__', + '__mul__', + '__sub__', + 'add', + 'truediv', + 'floordiv', + 'pow', + 'mod', + 'mul', + 'sub'] - assert_pandas_eland_series_equal(pd_avg_price, ed_avg_price, check_less_precise=True) + for op in ops: + pd_series = getattr(pd_df['taxful_total_price'], op)(pd_df['total_quantity']) + ed_series = getattr(ed_df['taxful_total_price'], op)(ed_df['total_quantity']) + assert_pandas_eland_series_equal(pd_series, ed_series, check_less_precise=True) - def test_ecommerce_series_div_float(self): - pd_df = self.pd_ecommerce() - ed_df = self.ed_ecommerce() + pd_series = getattr(pd_df['taxful_total_price'], op)(10.56) + ed_series = getattr(ed_df['taxful_total_price'], op)(10.56) + assert_pandas_eland_series_equal(pd_series, ed_series, check_less_precise=True) - pd_avg_price = pd_df['total_quantity'] / 10.0 - ed_avg_price = ed_df['total_quantity'] / 10.0 + pd_series = getattr(pd_df['taxful_total_price'], op)(int(8)) + ed_series = getattr(ed_df['taxful_total_price'], op)(int(8)) + assert_pandas_eland_series_equal(pd_series, ed_series, check_less_precise=True) - assert_pandas_eland_series_equal(pd_avg_price, ed_avg_price, check_less_precise=True) - - def test_ecommerce_series_div_int(self): - pd_df = self.pd_ecommerce() - ed_df = self.ed_ecommerce() - - pd_avg_price = pd_df['total_quantity'] / int(10) - ed_avg_price = ed_df['total_quantity'] / int(10) - - assert_pandas_eland_series_equal(pd_avg_price, ed_avg_price, check_less_precise=True) diff --git a/eland/tests/series/test_info_es_pytest.py b/eland/tests/series/test_info_es_pytest.py new file mode 100644 index 0000000..cc6b633 --- /dev/null +++ b/eland/tests/series/test_info_es_pytest.py @@ -0,0 +1,17 @@ +# File called _pytest for PyCharm compatability + +from pandas.util.testing import assert_almost_equal + +from eland.tests.common import TestData + +import eland as ed + + +class TestSeriesInfoEs(TestData): + + def test_flights_info_es(self): + ed_flights = self.ed_flights()['AvgTicketPrice'] + + # No assertion, just test it can be called + info_es = ed_flights.info_es() + diff --git a/eland/tests/series/test_metrics_pytest.py b/eland/tests/series/test_metrics_pytest.py new file mode 100644 index 0000000..ef221ba --- /dev/null +++ b/eland/tests/series/test_metrics_pytest.py @@ -0,0 +1,44 @@ +# File called _pytest for PyCharm compatability + +from pandas.util.testing import assert_almost_equal + +from eland.tests.common import TestData + +import eland as ed + + +class TestSeriesMetrics(TestData): + + funcs = ['max', 'min', 'mean', 'sum'] + + def test_flights_metrics(self): + pd_flights = self.pd_flights()['AvgTicketPrice'] + ed_flights = self.ed_flights()['AvgTicketPrice'] + + for func in self.funcs: + pd_metric = getattr(pd_flights, func)() + ed_metric = getattr(ed_flights, func)() + assert_almost_equal(pd_metric, ed_metric, check_less_precise=True) + + def test_ecommerce_selected_non_numeric_source_fields(self): + # None of these are numeric + column = 'category' + + ed_ecommerce = self.ed_ecommerce()[column] + + for func in self.funcs: + ed_metric = getattr(ed_ecommerce, func)() + assert ed_metric.empty + + + def test_ecommerce_selected_all_numeric_source_fields(self): + # All of these are numeric + columns = ['total_quantity', 'taxful_total_price', 'taxless_total_price'] + + for column in columns: + pd_ecommerce = self.pd_ecommerce()[column] + ed_ecommerce = self.ed_ecommerce()[column] + + for func in self.funcs: + assert_almost_equal(getattr(pd_ecommerce, func)(), getattr(ed_ecommerce, func)(), + check_less_precise=True) diff --git a/eland/tests/series/test_repr_pytest.py b/eland/tests/series/test_repr_pytest.py index 9b937c8..e83d6e9 100644 --- a/eland/tests/series/test_repr_pytest.py +++ b/eland/tests/series/test_repr_pytest.py @@ -1,13 +1,14 @@ # File called _pytest for PyCharm compatability import eland as ed +import pandas as pd from eland.tests import ELASTICSEARCH_HOST -from eland.tests import FLIGHTS_INDEX_NAME +from eland.tests import FLIGHTS_INDEX_NAME, ECOMMERCE_INDEX_NAME from eland.tests.common import TestData class TestSeriesRepr(TestData): - def test_repr(self): + def test_repr_flights_carrier(self): pd_s = self.pd_flights()['Carrier'] ed_s = ed.Series(ELASTICSEARCH_HOST, FLIGHTS_INDEX_NAME, 'Carrier') @@ -15,3 +16,12 @@ class TestSeriesRepr(TestData): ed_repr = repr(ed_s) assert pd_repr == ed_repr + + def test_repr_flights_carrier_5(self): + pd_s = self.pd_flights()['Carrier'].head(5) + ed_s = ed.Series(ELASTICSEARCH_HOST, FLIGHTS_INDEX_NAME, 'Carrier').head(5) + + pd_repr = repr(pd_s) + ed_repr = repr(ed_s) + + assert pd_repr == ed_repr From 91c811345ca92b0458c993fe3bc89fc1ae83fd58 Mon Sep 17 00:00:00 2001 From: Stephen Dodson Date: Fri, 22 Nov 2019 16:22:16 +0000 Subject: [PATCH 4/9] Minor updates to docs and doctests --- .../api/eland.DataFrame.to_numpy.rst | 6 +++ .../reference/api/eland.DataFrame.values.rst | 6 +++ .../reference/api/eland.Series.info_es.rst | 6 +++ .../reference/api/eland.Series.to_numpy.rst | 6 +++ docs/source/reference/dataframe.rst | 2 + docs/source/reference/series.rst | 1 + eland/dataframe.py | 30 ++++++++++++--- eland/series.py | 37 +++++++++++++++++++ 8 files changed, 88 insertions(+), 6 deletions(-) create mode 100644 docs/source/reference/api/eland.DataFrame.to_numpy.rst create mode 100644 docs/source/reference/api/eland.DataFrame.values.rst create mode 100644 docs/source/reference/api/eland.Series.info_es.rst create mode 100644 docs/source/reference/api/eland.Series.to_numpy.rst diff --git a/docs/source/reference/api/eland.DataFrame.to_numpy.rst b/docs/source/reference/api/eland.DataFrame.to_numpy.rst new file mode 100644 index 0000000..0455c77 --- /dev/null +++ b/docs/source/reference/api/eland.DataFrame.to_numpy.rst @@ -0,0 +1,6 @@ +eland.DataFrame.to_numpy +======================== + +.. currentmodule:: eland + +.. automethod:: DataFrame.to_numpy diff --git a/docs/source/reference/api/eland.DataFrame.values.rst b/docs/source/reference/api/eland.DataFrame.values.rst new file mode 100644 index 0000000..3af3afc --- /dev/null +++ b/docs/source/reference/api/eland.DataFrame.values.rst @@ -0,0 +1,6 @@ +eland.DataFrame.values +====================== + +.. currentmodule:: eland + +.. autoattribute:: DataFrame.values diff --git a/docs/source/reference/api/eland.Series.info_es.rst b/docs/source/reference/api/eland.Series.info_es.rst new file mode 100644 index 0000000..2b3b104 --- /dev/null +++ b/docs/source/reference/api/eland.Series.info_es.rst @@ -0,0 +1,6 @@ +eland.Series.info_es +==================== + +.. currentmodule:: eland + +.. automethod:: Series.info_es diff --git a/docs/source/reference/api/eland.Series.to_numpy.rst b/docs/source/reference/api/eland.Series.to_numpy.rst new file mode 100644 index 0000000..e8f73d1 --- /dev/null +++ b/docs/source/reference/api/eland.Series.to_numpy.rst @@ -0,0 +1,6 @@ +eland.Series.to_numpy +===================== + +.. currentmodule:: eland + +.. automethod:: Series.to_numpy diff --git a/docs/source/reference/dataframe.rst b/docs/source/reference/dataframe.rst index 64f5b29..4757c0c 100644 --- a/docs/source/reference/dataframe.rst +++ b/docs/source/reference/dataframe.rst @@ -23,6 +23,7 @@ Attributes and underlying data DataFrame.columns DataFrame.dtypes DataFrame.select_dtypes + DataFrame.values DataFrame.empty DataFrame.shape @@ -81,6 +82,7 @@ Serialization / IO / conversion :toctree: api/ DataFrame.info + DataFrame.to_numpy DataFrame.to_csv DataFrame.to_html DataFrame.to_string diff --git a/docs/source/reference/series.rst b/docs/source/reference/series.rst index 366e57a..9e807aa 100644 --- a/docs/source/reference/series.rst +++ b/docs/source/reference/series.rst @@ -72,6 +72,7 @@ Serialization / IO / conversion :toctree: api/ Series.to_string + Series.to_numpy Elasticsearch utilities ~~~~~~~~~~~~~~~~~~~~~~~ diff --git a/eland/dataframe.py b/eland/dataframe.py index 53a7f76..5bead9d 100644 --- a/eland/dataframe.py +++ b/eland/dataframe.py @@ -1064,12 +1064,31 @@ class DataFrame(NDFrame): In pandas this returns a Numpy representation of the DataFrame. This would involve scan/scrolling the entire index. - If this is required, call ``ed.eland_to_pandas(ed_df).values``, _but beware this will scan/scroll the entire - Elasticsearch index(s) into memory_ + If this is required, call ``ed.eland_to_pandas(ed_df).values``, *but beware this will scan/scroll the entire + Elasticsearch index(s) into memory.* See Also -------- :pandas_api_docs:`pandas.DataFrame.values` + eland_to_pandas + to_numpy + """ + self.to_numpy() + + def to_numpy(self): + """ + Not implemented. + + In pandas this returns a Numpy representation of the DataFrame. This would involve scan/scrolling the + entire index. + + If this is required, call ``ed.eland_to_pandas(ed_df).values``, *but beware this will scan/scroll the entire + Elasticsearch index(s) into memory.* + + See Also + -------- + :pandas_api_docs:`pandas.DataFrame.to_numpy` + eland_to_pandas Examples -------- @@ -1094,9 +1113,8 @@ class DataFrame(NDFrame): [181.69421554118, 'Kibana Airlines'], [730.041778346198, 'Kibana Airlines']], dtype=object) """ - raise NotImplementedError( - "This method would scan/scroll the entire Elasticsearch index(s) into memory." - "If this is explicitly required and there is sufficient memory, call `ed.eland_to_pandas(ed_df).values`" + raise AttributeError( + "This method would scan/scroll the entire Elasticsearch index(s) into memory. " + "If this is explicitly required, and there is sufficient memory, call `ed.eland_to_pandas(ed_df).values`" ) - to_numpy = values diff --git a/eland/series.py b/eland/series.py index 4e39e85..b68aa7d 100644 --- a/eland/series.py +++ b/eland/series.py @@ -878,3 +878,40 @@ class Series(NDFrame): """ results = super().nunique() return results.squeeze() + + #def values TODO - not implemented as causes current implementation of query to fail + + def to_numpy(self): + """ + Not implemented. + + In pandas this returns a Numpy representation of the Series. This would involve scan/scrolling the + entire index. + + If this is required, call ``ed.eland_to_pandas(ed_series).values``, *but beware this will scan/scroll the entire + Elasticsearch index(s) into memory.* + + See Also + -------- + :pandas_api_docs:`pandas.DataFrame.to_numpy` + eland_to_pandas + + Examples + -------- + >>> ed_s = ed.Series('localhost', 'flights', name='Carrier').head(5) + >>> pd_s = ed.eland_to_pandas(ed_s) + >>> print("type(ed_s)={0}\\ntype(pd_s)={1}".format(type(ed_s), type(pd_s))) + type(ed_s)= + type(pd_s)= + >>> ed_s + 0 Kibana Airlines + 1 Logstash Airways + 2 Logstash Airways + 3 Kibana Airlines + 4 Kibana Airlines + Name: Carrier, dtype: object + """ + raise NotImplementedError( + "This method would scan/scroll the entire Elasticsearch index(s) into memory." + "If this is explicitly required and there is sufficient memory, call `ed.eland_to_pandas(ed_df).values`" + ) From e755a2e1600e82f0708ba3bd9565f429bd95ff72 Mon Sep 17 00:00:00 2001 From: Stephen Dodson Date: Fri, 22 Nov 2019 16:29:51 +0000 Subject: [PATCH 5/9] Minor doc fix for Series.to_string --- eland/series.py | 14 +++++++++++++- 1 file changed, 13 insertions(+), 1 deletion(-) diff --git a/eland/series.py b/eland/series.py index b68aa7d..4b56450 100644 --- a/eland/series.py +++ b/eland/series.py @@ -23,7 +23,7 @@ import pandas as pd from pandas.io.common import _expand_user, _stringify_path from eland import NDFrame -from eland.common import DEFAULT_NUM_ROWS_DISPLAYED +from eland.common import DEFAULT_NUM_ROWS_DISPLAYED, docstring_parameter from eland.filter import NotFilter, Equal, Greater, Less, GreaterEqual, LessEqual, ScriptFilter, IsIn @@ -269,6 +269,7 @@ class Series(NDFrame): return result + @docstring_parameter(DEFAULT_NUM_ROWS_DISPLAYED) def to_string( self, buf=None, @@ -282,6 +283,17 @@ class Series(NDFrame): max_rows=None, min_rows=None, ): + """ + Render a string representation of the Series. + + Follows pandas implementation except when ``max_rows=None``. In this scenario, we set ``max_rows={0}`` to avoid + accidentally dumping an entire index. This can be overridden by explicitly setting ``max_rows``. + + See Also + -------- + :pandas_api_docs:`pandas.Series.to_string` + for argument details. + """ # In pandas calling 'to_string' without max_rows set, will dump ALL rows - we avoid this # by limiting rows by default. num_rows = len(self) # avoid multiple calls From ac8cb302de9f9234458673d458180f54ad04f7b9 Mon Sep 17 00:00:00 2001 From: Stephen Dodson Date: Mon, 25 Nov 2019 12:43:37 +0000 Subject: [PATCH 6/9] Updates based on PR review. --- eland/dataframe.py | 3 +- eland/mappings.py | 18 +++--- eland/operations.py | 12 +++- eland/query_compiler.py | 16 +++-- eland/series.py | 26 ++++++-- eland/tests/dataframe/test_dtypes_pytest.py | 3 + eland/tests/series/test_arithmetics_pytest.py | 61 ++++++++++++++++++- 7 files changed, 114 insertions(+), 25 deletions(-) diff --git a/eland/dataframe.py b/eland/dataframe.py index 5bead9d..c14e152 100644 --- a/eland/dataframe.py +++ b/eland/dataframe.py @@ -383,7 +383,8 @@ class DataFrame(NDFrame): tasks: [('boolean_filter', {'bool': {'must': [{'term': {'OriginAirportID': 'AMS'}}, {'range': {'FlightDelayMin': {'gt': 60}}}]}}), ('field_names', ['timestamp', 'OriginAirportID', 'DestAirportID', 'FlightDelayMin']), ('tail', ('_doc', 5))] size: 5 sort_params: _doc:desc - field_names: ['timestamp', 'OriginAirportID', 'DestAirportID', 'FlightDelayMin'] + _source: ['timestamp', 'OriginAirportID', 'DestAirportID', 'FlightDelayMin'] + body: {'query': {'bool': {'must': [{'term': {'OriginAirportID': 'AMS'}}, {'range': {'FlightDelayMin': {'gt': 60}}}]}}, 'aggs': {}} post_processing: ['sort_index'] """ diff --git a/eland/mappings.py b/eland/mappings.py index 48140eb..dabbcef 100644 --- a/eland/mappings.py +++ b/eland/mappings.py @@ -1,5 +1,6 @@ import warnings +import numpy as np import pandas as pd from pandas.core.dtypes.common import (is_float_dtype, is_bool_dtype, is_integer_dtype, is_datetime_or_timedelta_dtype, is_string_dtype) @@ -454,13 +455,13 @@ class Mappings: """ if include_bool == True: df = self._mappings_capabilities[(self._mappings_capabilities._source == True) & - ((self._mappings_capabilities.pd_dtype == 'int64') | - (self._mappings_capabilities.pd_dtype == 'float64') | - (self._mappings_capabilities.pd_dtype == 'bool'))] + ((self._mappings_capabilities.pd_dtype == 'int64') | + (self._mappings_capabilities.pd_dtype == 'float64') | + (self._mappings_capabilities.pd_dtype == 'bool'))] else: df = self._mappings_capabilities[(self._mappings_capabilities._source == True) & - ((self._mappings_capabilities.pd_dtype == 'int64') | - (self._mappings_capabilities.pd_dtype == 'float64'))] + ((self._mappings_capabilities.pd_dtype == 'int64') | + (self._mappings_capabilities.pd_dtype == 'float64'))] # if field_names exists, filter index with field_names if field_names is not None: # reindex adds NA for non-existing field_names (non-numeric), so drop these after reindex @@ -493,13 +494,14 @@ class Mappings: Returns ------- dtypes: pd.Series - Source field name + pd_dtype + Source field name + pd_dtype as np.dtype """ if field_names is not None: return pd.Series( - {key: self._source_field_pd_dtypes[key] for key in field_names}) + {key: np.dtype(self._source_field_pd_dtypes[key]) for key in field_names}) - return pd.Series(self._source_field_pd_dtypes) + return pd.Series( + {key: np.dtype(value) for key, value in self._source_field_pd_dtypes.items()}) def info_es(self, buf): buf.write("Mappings:\n") diff --git a/eland/operations.py b/eland/operations.py index 20dfe14..9c7dfdc 100644 --- a/eland/operations.py +++ b/eland/operations.py @@ -77,7 +77,7 @@ class Operations: def set_field_names(self, field_names): # Setting field_names at different phases of the task list may result in different # operations. So instead of setting field_names once, set when it happens in call chain - if type(field_names) is not list: + if not isinstance(field_names, list): field_names = list(field_names) # TODO - field_name renaming @@ -538,6 +538,7 @@ class Operations: return collector.ret + def _es_results(self, query_compiler, collector): query_params, post_processing = self._resolve_tasks() @@ -989,9 +990,16 @@ class Operations: size, sort_params = Operations._query_params_to_size_and_sort(query_params) field_names = self.get_field_names() + script_fields = query_params['query_script_fields'] + query = Query(query_params['query']) + body = query.to_search_body() + if script_fields is not None: + body['script_fields'] = script_fields + buf.write(" size: {0}\n".format(size)) buf.write(" sort_params: {0}\n".format(sort_params)) - buf.write(" field_names: {0}\n".format(field_names)) + buf.write(" _source: {0}\n".format(field_names)) + buf.write(" body: {0}\n".format(body)) buf.write(" post_processing: {0}\n".format(post_processing)) def update_query(self, boolean_filter): diff --git a/eland/query_compiler.py b/eland/query_compiler.py index e057807..cf642ab 100644 --- a/eland/query_compiler.py +++ b/eland/query_compiler.py @@ -1,7 +1,5 @@ import pandas as pd -from pandas.core.dtypes.common import ( - is_list_like -) +import numpy as np from eland import Client from eland import Index @@ -300,6 +298,15 @@ class ElandQueryCompiler: else: out[field_name] = x else: + # Script fields end up here + + # Elasticsearch returns 'Infinity' as a string for np.inf values. + # Map this to a numeric value to avoid this whole Series being classed as an object + # TODO - create a lookup for script fields and dtypes to only map 'Infinity' + # if the field is numeric. This implementation will currently map + # any script field with "Infinity" as a string to np.inf + if x == 'Infinity': + x = np.inf out[name[:-1]] = x flatten(y) @@ -600,6 +607,5 @@ class ElandQueryCompiler: def copy(self): return self.__constructor__( field_to_display_names=self._field_to_display_names.copy(), - display_to_field_names = self._display_to_field_names.copy() + display_to_field_names=self._display_to_field_names.copy() ) - diff --git a/eland/series.py b/eland/series.py index 4b56450..3d364e1 100644 --- a/eland/series.py +++ b/eland/series.py @@ -19,6 +19,8 @@ import sys import warnings from io import StringIO +import numpy as np + import pandas as pd from pandas.io.common import _expand_user, _stringify_path @@ -140,7 +142,9 @@ class Series(NDFrame): def rename(self, new_name): """ Rename name of series. Only column rename is supported. This does not change the underlying - Elasticsearch index, but adds a soft link from the new name (column) to the Elasticsearch field name + Elasticsearch index, but adds a symbolic link from the new name (column) to the Elasticsearch field name. + + For instance, if a field was called 'tot_quan' it could be renamed 'Total Quantity'. Parameters ---------- @@ -358,6 +362,11 @@ class Series(NDFrame): def _to_pandas(self): return self._query_compiler.to_pandas()[self.name] + @property + def _dtype(self): + # DO NOT MAKE PUBLIC (i.e. def dtype) as this breaks query eval implementation + return self._query_compiler.dtypes[0] + def __gt__(self, other): if isinstance(other, Series): # Need to use scripted query to compare to values @@ -745,9 +754,15 @@ class Series(NDFrame): a == Series, b == numeric """ if isinstance(right, Series): - # Check compatibility + # Check compatibility of Elasticsearch cluster self._query_compiler.check_arithmetics(right._query_compiler) + # Check compatibility of dtypes + # either not a number? + if not (np.issubdtype(self._dtype, np.number) and np.issubdtype(right._dtype, np.number)): + # TODO - support limited ops on strings https://github.com/elastic/eland/issues/65 + raise TypeError("Unsupported operation: '{}' {} '{}'".format(self._dtype, method_name, right._dtype)) + new_field_name = "{0}_{1}_{2}".format(self.name, method_name, right.name) # Compatible, so create new Series @@ -756,12 +771,12 @@ class Series(NDFrame): series.name = None return series - elif isinstance(right, (int, float)): # TODO extend to numpy types + elif np.issubdtype(np.dtype(type(right)), np.number): # allow np types new_field_name = "{0}_{1}_{2}".format(self.name, method_name, str(right).replace('.', '_')) # Compatible, so create new Series series = Series(query_compiler=self._query_compiler.arithmetic_op_fields( - new_field_name, method_name, self.name, float(right))) # force rhs to float + new_field_name, method_name, self.name, right)) # name of Series remains original name series.name = self.name @@ -769,8 +784,7 @@ class Series(NDFrame): return series else: raise TypeError( - "Can only perform arithmetic operation on selected types " - "{0} != {1} for {2}".format(type(self), type(right), method_name) + "unsupported operand type(s) for '{}' {} '{}'".format(type(self), method_name, type(right)) ) def max(self): diff --git a/eland/tests/dataframe/test_dtypes_pytest.py b/eland/tests/dataframe/test_dtypes_pytest.py index 2db1734..9ba44ff 100644 --- a/eland/tests/dataframe/test_dtypes_pytest.py +++ b/eland/tests/dataframe/test_dtypes_pytest.py @@ -16,6 +16,9 @@ class TestDataFrameDtypes(TestData): assert_series_equal(pd_flights.dtypes, ed_flights.dtypes) + for i in range(0, len(pd_flights.dtypes)-1): + assert type(pd_flights.dtypes[i]) == type(ed_flights.dtypes[i]) + def test_flights_select_dtypes(self): ed_flights = self.ed_flights_small() pd_flights = self.pd_flights_small() diff --git a/eland/tests/series/test_arithmetics_pytest.py b/eland/tests/series/test_arithmetics_pytest.py index d13595f..d57703a 100644 --- a/eland/tests/series/test_arithmetics_pytest.py +++ b/eland/tests/series/test_arithmetics_pytest.py @@ -1,11 +1,10 @@ # File called _pytest for PyCharm compatability -import eland as ed -from eland.tests.common import TestData, assert_pandas_eland_series_equal -from pandas.util.testing import assert_series_equal import pytest import numpy as np +from eland.tests.common import TestData, assert_pandas_eland_series_equal + class TestSeriesArithmetics(TestData): @@ -45,7 +44,63 @@ class TestSeriesArithmetics(TestData): ed_series = getattr(ed_df['taxful_total_price'], op)(10.56) assert_pandas_eland_series_equal(pd_series, ed_series, check_less_precise=True) + pd_series = getattr(pd_df['taxful_total_price'], op)(np.float32(1.879)) + ed_series = getattr(ed_df['taxful_total_price'], op)(np.float32(1.879)) + assert_pandas_eland_series_equal(pd_series, ed_series, check_less_precise=True) + pd_series = getattr(pd_df['taxful_total_price'], op)(int(8)) ed_series = getattr(ed_df['taxful_total_price'], op)(int(8)) assert_pandas_eland_series_equal(pd_series, ed_series, check_less_precise=True) + def test_supported_series_dtypes_ops(self): + pd_df = self.pd_ecommerce().head(100) + ed_df = self.ed_ecommerce().head(100) + + # Test some specific operations that are and aren't supported + numeric_ops = ['__add__', + '__truediv__', + '__floordiv__', + '__pow__', + '__mod__', + '__mul__', + '__sub__'] + + non_string_numeric_ops = ['__add__', + '__truediv__', + '__floordiv__', + '__pow__', + '__mod__', + '__sub__'] + # __mul__ is supported for int * str in pandas + + # float op float + for op in numeric_ops: + pd_series = getattr(pd_df['taxful_total_price'], op)(pd_df['taxless_total_price']) + ed_series = getattr(ed_df['taxful_total_price'], op)(ed_df['taxless_total_price']) + assert_pandas_eland_series_equal(pd_series, ed_series, check_less_precise=True) + + # int op float + for op in numeric_ops: + pd_series = getattr(pd_df['total_quantity'], op)(pd_df['taxless_total_price']) + ed_series = getattr(ed_df['total_quantity'], op)(ed_df['taxless_total_price']) + assert_pandas_eland_series_equal(pd_series, ed_series, check_less_precise=True) + + # float op int + for op in numeric_ops: + pd_series = getattr(pd_df['taxful_total_price'], op)(pd_df['total_quantity']) + ed_series = getattr(ed_df['taxful_total_price'], op)(ed_df['total_quantity']) + assert_pandas_eland_series_equal(pd_series, ed_series, check_less_precise=True) + + # str op int (throws) + for op in non_string_numeric_ops: + with pytest.raises(TypeError): + pd_series = getattr(pd_df['currency'], op)(pd_df['total_quantity']) + with pytest.raises(TypeError): + ed_series = getattr(ed_df['currency'], op)(ed_df['total_quantity']) + + # int op str (throws) + for op in non_string_numeric_ops: + with pytest.raises(TypeError): + pd_series = getattr(pd_df['total_quantity'], op)(pd_df['currency']) + with pytest.raises(TypeError): + ed_series = getattr(ed_df['total_quantity'], op)(ed_df['currency']) From b99f25e4eea40578d75547abe8aa7b466b2cbc02 Mon Sep 17 00:00:00 2001 From: Stephen Dodson Date: Mon, 25 Nov 2019 15:00:02 +0000 Subject: [PATCH 7/9] Adding __r* operations and resolving issues with df.info() --- eland/dataframe.py | 5 ++ eland/operations.py | 81 ++++++++++++++----- eland/series.py | 55 ++++++++++++- eland/tests/series/test_arithmetics_pytest.py | 36 +++++++++ 4 files changed, 157 insertions(+), 20 deletions(-) diff --git a/eland/dataframe.py b/eland/dataframe.py index c14e152..b75fc1d 100644 --- a/eland/dataframe.py +++ b/eland/dataframe.py @@ -519,7 +519,12 @@ class DataFrame(NDFrame): else: _verbose_repr() + # pandas 0.25.1 uses get_dtype_counts() here. This + # returns a Series with strings as the index NOT dtypes. + # Therefore, to get consistent ordering we need to + # align types with pandas method. counts = self.dtypes.value_counts() + counts.index = counts.index.astype(str) dtypes = ['{k}({kk:d})'.format(k=k[0], kk=k[1]) for k in sorted(counts.items())] lines.append('dtypes: {types}'.format(types=', '.join(dtypes))) diff --git a/eland/operations.py b/eland/operations.py index 9c7dfdc..de681fa 100644 --- a/eland/operations.py +++ b/eland/operations.py @@ -1,6 +1,7 @@ import copy from enum import Enum +import numpy as np import pandas as pd from eland import Index @@ -172,7 +173,7 @@ class Operations: # some metrics aggs (including cardinality) work on all aggregatable fields # therefore we include an optional all parameter on operations # that call _metric_aggs - if field_types=='aggregatable': + if field_types == 'aggregatable': source_fields = query_compiler._mappings.aggregatable_field_names(field_names) else: source_fields = query_compiler._mappings.numeric_source_fields(field_names) @@ -193,7 +194,7 @@ class Operations: # } results = {} - if field_types=='aggregatable': + if field_types == 'aggregatable': for key, value in source_fields.items(): results[value] = response['aggregations'][key]['value'] else: @@ -538,7 +539,6 @@ class Operations: return collector.ret - def _es_results(self, query_compiler, collector): query_params, post_processing = self._resolve_tasks() @@ -561,12 +561,24 @@ class Operations: is_scan = False if size is not None and size <= 10000: if size > 0: - es_results = query_compiler._client.search( - index=query_compiler._index_pattern, - size=size, - sort=sort_params, - body=body, - _source=field_names) + try: + es_results = query_compiler._client.search( + index=query_compiler._index_pattern, + size=size, + sort=sort_params, + body=body, + _source=field_names) + except: + # Catch ES error and print debug (currently to stdout) + error = { + 'index': query_compiler._index_pattern, + 'size': size, + 'sort': sort_params, + 'body': body, + '_source': field_names + } + print("Elasticsearch error:", error) + raise else: is_scan = True es_results = query_compiler._client.scan( @@ -589,7 +601,6 @@ class Operations: df = self._apply_df_post_processing(df, post_processing) collector.collect(df) - def iloc(self, index, field_names): # index and field_names are indexers task = ('iloc', (index, field_names)) @@ -884,7 +895,7 @@ class Operations: right_field = item[1][1][1][1] # https://www.elastic.co/guide/en/elasticsearch/painless/current/painless-api-reference-shared-java-lang.html#painless-api-reference-shared-Math - if isinstance(right_field, str): + if isinstance(left_field, str) and isinstance(right_field, str): """ (if op_name = '__truediv__') @@ -913,7 +924,6 @@ class Operations: else: raise NotImplementedError("Not implemented operation '{0}'".format(op_name)) - if query_params['query_script_fields'] is None: query_params['query_script_fields'] = {} query_params['query_script_fields'][field_name] = { @@ -921,7 +931,7 @@ class Operations: 'source': source } } - else: + elif isinstance(left_field, str) and np.issubdtype(np.dtype(type(right_field)), np.number): """ (if op_name = '__truediv__') @@ -949,18 +959,48 @@ class Operations: source = "doc['{0}'].value - {1}".format(left_field, right_field) else: raise NotImplementedError("Not implemented operation '{0}'".format(op_name)) + elif np.issubdtype(np.dtype(type(left_field)), np.number) and isinstance(right_field, str): + """ + (if op_name = '__truediv__') - - if query_params['query_script_fields'] is None: - query_params['query_script_fields'] = {} - query_params['query_script_fields'][field_name] = { - 'script': { - 'source': source + "script_fields": { + "field_name": { + "script": { + "source": "left_field / doc['right_field'].value" + } } } + """ + if op_name == '__add__': + source = "{0} + doc['{1}'].value".format(left_field, right_field) + elif op_name == '__truediv__': + source = "{0} / doc['{1}'].value".format(left_field, right_field) + elif op_name == '__floordiv__': + source = "Math.floor({0} / doc['{1}'].value)".format(left_field, right_field) + elif op_name == '__pow__': + source = "Math.pow({0}, doc['{1}'].value)".format(left_field, right_field) + elif op_name == '__mod__': + source = "{0} % doc['{1}'].value".format(left_field, right_field) + elif op_name == '__mul__': + source = "{0} * doc['{1}'].value".format(left_field, right_field) + elif op_name == '__sub__': + source = "{0} - doc['{1}'].value".format(left_field, right_field) + else: + raise NotImplementedError("Not implemented operation '{0}'".format(op_name)) + else: + raise TypeError("Types for operation inconsistent {} {} {}", type(left_field), type(right_field), op_name) + + if query_params['query_script_fields'] is None: + query_params['query_script_fields'] = {} + query_params['query_script_fields'][field_name] = { + 'script': { + 'source': source + } + } return query_params, post_processing + def _resolve_post_processing_task(self, item, query_params, post_processing): # Just do this in post-processing if item[0] != 'field_names': @@ -968,6 +1008,7 @@ class Operations: return query_params, post_processing + def _size(self, query_params, post_processing): # Shrink wrap code around checking if size parameter is set size = query_params['query_size'] # can be None @@ -982,6 +1023,7 @@ class Operations: # This can return None return size + def info_es(self, buf): buf.write("Operations:\n") buf.write(" tasks: {0}\n".format(self._tasks)) @@ -1002,6 +1044,7 @@ class Operations: buf.write(" body: {0}\n".format(body)) buf.write(" post_processing: {0}\n".format(post_processing)) + def update_query(self, boolean_filter): task = ('boolean_filter', boolean_filter) self._tasks.append(task) diff --git a/eland/series.py b/eland/series.py index 3d364e1..0c1e546 100644 --- a/eland/series.py +++ b/eland/series.py @@ -499,6 +499,7 @@ class Series(NDFrame): """ return self._numeric_op(right, _get_method_name()) + def __truediv__(self, right): """ Return floating division of series and right, element-wise (binary operator truediv). @@ -528,7 +529,7 @@ class Series(NDFrame): 3 2 4 2 Name: total_quantity, dtype: int64 - >>> df.taxful_total_price / df.total_quantity + >>> df.taxful_total_price / df.total_quantity # doctest: +SKIP 0 18.490000 1 26.990000 2 99.989998 @@ -733,6 +734,21 @@ class Series(NDFrame): """ return self._numeric_op(right, _get_method_name()) + def __radd__(self, left): + return self._numeric_rop(left, _get_method_name()) + def __rtruediv__(self, left): + return self._numeric_rop(left, _get_method_name()) + def __rfloordiv__(self, left): + return self._numeric_rop(left, _get_method_name()) + def __rmod__(self, left): + return self._numeric_rop(left, _get_method_name()) + def __rmul__(self, left): + return self._numeric_rop(left, _get_method_name()) + def __rpow__(self, left): + return self._numeric_rop(left, _get_method_name()) + def __rsub__(self, left): + return self._numeric_rop(left, _get_method_name()) + add = __add__ div = __truediv__ divide = __truediv__ @@ -745,6 +761,18 @@ class Series(NDFrame): subtract = __sub__ truediv = __truediv__ + radd = __radd__ + rdiv = __rtruediv__ + rdivide = __rtruediv__ + rfloordiv = __rfloordiv__ + rmod = __rmod__ + rmul = __rmul__ + rmultiply = __rmul__ + rpow = __rpow__ + rsub = __rsub__ + rsubtract = __rsub__ + rtruediv = __rtruediv__ + def _numeric_op(self, right, method_name): """ return a op b @@ -787,6 +815,31 @@ class Series(NDFrame): "unsupported operand type(s) for '{}' {} '{}'".format(type(self), method_name, type(right)) ) + def _numeric_rop(self, left, method_name): + """ + e.g. 1 + ed.Series + """ + op_method_name = str(method_name).replace('__r', '__') + if isinstance(left, Series): + # if both are Series, revese args and call normal op method and remove 'r' from radd etc. + return left._numeric_op(self, op_method_name) + elif np.issubdtype(np.dtype(type(left)), np.number): # allow np types + # Prefix new field name with 'f_' so it's a valid ES field name + new_field_name = "f_{0}_{1}_{2}".format(str(left).replace('.', '_'), op_method_name, self.name) + + # Compatible, so create new Series + series = Series(query_compiler=self._query_compiler.arithmetic_op_fields( + new_field_name, op_method_name, left, self.name)) + + # name of Series pinned to valid series (like pandas) + series.name = self.name + + return series + else: + raise TypeError( + "unsupported operand type(s) for '{}' {} '{}'".format(type(self), method_name, type(left)) + ) + def max(self): """ Return the maximum of the Series values diff --git a/eland/tests/series/test_arithmetics_pytest.py b/eland/tests/series/test_arithmetics_pytest.py index d57703a..7a50cc9 100644 --- a/eland/tests/series/test_arithmetics_pytest.py +++ b/eland/tests/series/test_arithmetics_pytest.py @@ -104,3 +104,39 @@ class TestSeriesArithmetics(TestData): pd_series = getattr(pd_df['total_quantity'], op)(pd_df['currency']) with pytest.raises(TypeError): ed_series = getattr(ed_df['total_quantity'], op)(ed_df['currency']) + + def test_ecommerce_series_basic_rarithmetics(self): + pd_df = self.pd_ecommerce().head(10) + ed_df = self.ed_ecommerce().head(10) + + ops = ['__radd__', + '__rtruediv__', + '__rfloordiv__', + '__rpow__', + '__rmod__', + '__rmul__', + '__rsub__', + 'radd', + 'rtruediv', + 'rfloordiv', + 'rpow', + 'rmod', + 'rmul', + 'rsub'] + + for op in ops: + pd_series = getattr(pd_df['taxful_total_price'], op)(pd_df['total_quantity']) + ed_series = getattr(ed_df['taxful_total_price'], op)(ed_df['total_quantity']) + assert_pandas_eland_series_equal(pd_series, ed_series, check_less_precise=True) + + pd_series = getattr(pd_df['taxful_total_price'], op)(3.141) + ed_series = getattr(ed_df['taxful_total_price'], op)(3.141) + assert_pandas_eland_series_equal(pd_series, ed_series, check_less_precise=True) + + pd_series = getattr(pd_df['taxful_total_price'], op)(np.float32(2.879)) + ed_series = getattr(ed_df['taxful_total_price'], op)(np.float32(2.879)) + assert_pandas_eland_series_equal(pd_series, ed_series, check_less_precise=True) + + pd_series = getattr(pd_df['taxful_total_price'], op)(int(6)) + ed_series = getattr(ed_df['taxful_total_price'], op)(int(6)) + assert_pandas_eland_series_equal(pd_series, ed_series, check_less_precise=True) From 85422e2023baef071528ce48a94dd0e812c17825 Mon Sep 17 00:00:00 2001 From: Stephen Dodson Date: Mon, 25 Nov 2019 15:49:27 +0000 Subject: [PATCH 8/9] Adding series __r* docs --- .../reference/api/eland.Series.radd.rst | 6 + .../reference/api/eland.Series.rdiv.rst | 6 + .../reference/api/eland.Series.rfloordiv.rst | 6 + .../reference/api/eland.Series.rmod.rst | 6 + .../reference/api/eland.Series.rmul.rst | 6 + .../reference/api/eland.Series.rpow.rst | 6 + .../reference/api/eland.Series.rsub.rst | 6 + .../reference/api/eland.Series.rtruediv.rst | 6 + docs/source/reference/series.rst | 8 + eland/series.py | 215 +++++++++++++++++- 10 files changed, 270 insertions(+), 1 deletion(-) create mode 100644 docs/source/reference/api/eland.Series.radd.rst create mode 100644 docs/source/reference/api/eland.Series.rdiv.rst create mode 100644 docs/source/reference/api/eland.Series.rfloordiv.rst create mode 100644 docs/source/reference/api/eland.Series.rmod.rst create mode 100644 docs/source/reference/api/eland.Series.rmul.rst create mode 100644 docs/source/reference/api/eland.Series.rpow.rst create mode 100644 docs/source/reference/api/eland.Series.rsub.rst create mode 100644 docs/source/reference/api/eland.Series.rtruediv.rst diff --git a/docs/source/reference/api/eland.Series.radd.rst b/docs/source/reference/api/eland.Series.radd.rst new file mode 100644 index 0000000..6bed65f --- /dev/null +++ b/docs/source/reference/api/eland.Series.radd.rst @@ -0,0 +1,6 @@ +eland.Series.radd +================= + +.. currentmodule:: eland + +.. automethod:: Series.radd diff --git a/docs/source/reference/api/eland.Series.rdiv.rst b/docs/source/reference/api/eland.Series.rdiv.rst new file mode 100644 index 0000000..6ef8511 --- /dev/null +++ b/docs/source/reference/api/eland.Series.rdiv.rst @@ -0,0 +1,6 @@ +eland.Series.rdiv +================= + +.. currentmodule:: eland + +.. automethod:: Series.rdiv diff --git a/docs/source/reference/api/eland.Series.rfloordiv.rst b/docs/source/reference/api/eland.Series.rfloordiv.rst new file mode 100644 index 0000000..d4bfc91 --- /dev/null +++ b/docs/source/reference/api/eland.Series.rfloordiv.rst @@ -0,0 +1,6 @@ +eland.Series.rfloordiv +====================== + +.. currentmodule:: eland + +.. automethod:: Series.rfloordiv diff --git a/docs/source/reference/api/eland.Series.rmod.rst b/docs/source/reference/api/eland.Series.rmod.rst new file mode 100644 index 0000000..6999399 --- /dev/null +++ b/docs/source/reference/api/eland.Series.rmod.rst @@ -0,0 +1,6 @@ +eland.Series.rmod +================= + +.. currentmodule:: eland + +.. automethod:: Series.rmod diff --git a/docs/source/reference/api/eland.Series.rmul.rst b/docs/source/reference/api/eland.Series.rmul.rst new file mode 100644 index 0000000..3a139ec --- /dev/null +++ b/docs/source/reference/api/eland.Series.rmul.rst @@ -0,0 +1,6 @@ +eland.Series.rmul +================= + +.. currentmodule:: eland + +.. automethod:: Series.rmul diff --git a/docs/source/reference/api/eland.Series.rpow.rst b/docs/source/reference/api/eland.Series.rpow.rst new file mode 100644 index 0000000..e057ed3 --- /dev/null +++ b/docs/source/reference/api/eland.Series.rpow.rst @@ -0,0 +1,6 @@ +eland.Series.rpow +================= + +.. currentmodule:: eland + +.. automethod:: Series.rpow diff --git a/docs/source/reference/api/eland.Series.rsub.rst b/docs/source/reference/api/eland.Series.rsub.rst new file mode 100644 index 0000000..ef524d2 --- /dev/null +++ b/docs/source/reference/api/eland.Series.rsub.rst @@ -0,0 +1,6 @@ +eland.Series.rsub +================= + +.. currentmodule:: eland + +.. automethod:: Series.rsub diff --git a/docs/source/reference/api/eland.Series.rtruediv.rst b/docs/source/reference/api/eland.Series.rtruediv.rst new file mode 100644 index 0000000..d60a5c1 --- /dev/null +++ b/docs/source/reference/api/eland.Series.rtruediv.rst @@ -0,0 +1,6 @@ +eland.Series.rtruediv +===================== + +.. currentmodule:: eland + +.. automethod:: Series.rtruediv diff --git a/docs/source/reference/series.rst b/docs/source/reference/series.rst index 9e807aa..1c30d82 100644 --- a/docs/source/reference/series.rst +++ b/docs/source/reference/series.rst @@ -45,6 +45,14 @@ Binary operator functions Series.floordiv Series.mod Series.pow + Series.radd + Series.rsub + Series.rmul + Series.rdiv + Series.rtruediv + Series.rfloordiv + Series.rmod + Series.rpow Computations / descriptive stats ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ diff --git a/eland/series.py b/eland/series.py index 0c1e546..0a3df97 100644 --- a/eland/series.py +++ b/eland/series.py @@ -482,6 +482,13 @@ class Series(NDFrame): 3 174.98 4 80.98 Name: taxful_total_price, dtype: float64 + >>> df.taxful_total_price + 1 + 0 37.980000 + 1 54.980000 + 2 200.979996 + 3 175.979996 + 4 81.980003 + Name: taxful_total_price, dtype: float64 >>> df.total_quantity 0 2 1 2 @@ -529,7 +536,7 @@ class Series(NDFrame): 3 2 4 2 Name: total_quantity, dtype: int64 - >>> df.taxful_total_price / df.total_quantity # doctest: +SKIP + >>> df.taxful_total_price / df.total_quantity 0 18.490000 1 26.990000 2 99.989998 @@ -735,18 +742,221 @@ class Series(NDFrame): return self._numeric_op(right, _get_method_name()) def __radd__(self, left): + """ + Return addition of series and left, element-wise (binary operator add). + + Parameters + ---------- + left: eland.Series + + Returns + ------- + eland.Series + + Examples + -------- + >>> df = ed.DataFrame('localhost', 'ecommerce').head(5) + >>> df.taxful_total_price + 0 36.98 + 1 53.98 + 2 199.98 + 3 174.98 + 4 80.98 + Name: taxful_total_price, dtype: float64 + >>> 1 + df.taxful_total_price + 0 37.980000 + 1 54.980000 + 2 200.979996 + 3 175.979996 + 4 81.980003 + Name: taxful_total_price, dtype: float64 + """ return self._numeric_rop(left, _get_method_name()) def __rtruediv__(self, left): + """ + Return division of series and left, element-wise (binary operator div). + + Parameters + ---------- + left: eland.Series + + Returns + ------- + eland.Series + + Examples + -------- + >>> df = ed.DataFrame('localhost', 'ecommerce').head(5) + >>> df.taxful_total_price + 0 36.98 + 1 53.98 + 2 199.98 + 3 174.98 + 4 80.98 + Name: taxful_total_price, dtype: float64 + >>> 1.0 / df.taxful_total_price + 0 0.027042 + 1 0.018525 + 2 0.005001 + 3 0.005715 + 4 0.012349 + Name: taxful_total_price, dtype: float64 + """ return self._numeric_rop(left, _get_method_name()) def __rfloordiv__(self, left): + """ + Return integer division of series and left, element-wise (binary operator floordiv //). + + Parameters + ---------- + left: eland.Series + + Returns + ------- + eland.Series + + Examples + -------- + >>> df = ed.DataFrame('localhost', 'ecommerce').head(5) + >>> df.taxful_total_price + 0 36.98 + 1 53.98 + 2 199.98 + 3 174.98 + 4 80.98 + Name: taxful_total_price, dtype: float64 + >>> 500.0 // df.taxful_total_price + 0 13.0 + 1 9.0 + 2 2.0 + 3 2.0 + 4 6.0 + Name: taxful_total_price, dtype: float64 + """ return self._numeric_rop(left, _get_method_name()) def __rmod__(self, left): + """ + Return modulo of series and left, element-wise (binary operator mod %). + + Parameters + ---------- + left: eland.Series + + Returns + ------- + eland.Series + + Examples + -------- + >>> df = ed.DataFrame('localhost', 'ecommerce').head(5) + >>> df.taxful_total_price + 0 36.98 + 1 53.98 + 2 199.98 + 3 174.98 + 4 80.98 + Name: taxful_total_price, dtype: float64 + >>> 500.0 % df.taxful_total_price + 0 19.260006 + 1 14.180004 + 2 100.040009 + 3 150.040009 + 4 14.119980 + Name: taxful_total_price, dtype: float64 + """ return self._numeric_rop(left, _get_method_name()) def __rmul__(self, left): + """ + Return multiplication of series and left, element-wise (binary operator mul). + + Parameters + ---------- + left: eland.Series + + Returns + ------- + eland.Series + + Examples + -------- + >>> df = ed.DataFrame('localhost', 'ecommerce').head(5) + >>> df.taxful_total_price + 0 36.98 + 1 53.98 + 2 199.98 + 3 174.98 + 4 80.98 + Name: taxful_total_price, dtype: float64 + >>> 10.0 * df.taxful_total_price + 0 369.799995 + 1 539.799995 + 2 1999.799957 + 3 1749.799957 + 4 809.800034 + Name: taxful_total_price, dtype: float64 + """ return self._numeric_rop(left, _get_method_name()) def __rpow__(self, left): + """ + Return exponential power of series and left, element-wise (binary operator pow \**\). + + Parameters + ---------- + left: eland.Series + + Returns + ------- + eland.Series + + Examples + -------- + >>> df = ed.DataFrame('localhost', 'ecommerce').head(5) + >>> df.total_quantity + 0 2 + 1 2 + 2 2 + 3 2 + 4 2 + Name: total_quantity, dtype: int64 + >>> np.int(2) ** df.total_quantity + 0 4.0 + 1 4.0 + 2 4.0 + 3 4.0 + 4 4.0 + Name: total_quantity, dtype: float64 + """ return self._numeric_rop(left, _get_method_name()) def __rsub__(self, left): + """ + Return subtraction of series and left, element-wise (binary operator sub). + + Parameters + ---------- + left: eland.Series + + Returns + ------- + eland.Series + + Examples + -------- + >>> df = ed.DataFrame('localhost', 'ecommerce').head(5) + >>> df.taxful_total_price + 0 36.98 + 1 53.98 + 2 199.98 + 3 174.98 + 4 80.98 + Name: taxful_total_price, dtype: float64 + >>> 1.0 - df.taxful_total_price + 0 -35.980000 + 1 -52.980000 + 2 -198.979996 + 3 -173.979996 + 4 -79.980003 + Name: taxful_total_price, dtype: float64 + """ return self._numeric_rop(left, _get_method_name()) add = __add__ @@ -989,6 +1199,9 @@ class Series(NDFrame): 3 Kibana Airlines 4 Kibana Airlines Name: Carrier, dtype: object + >>> pd_s.to_numpy() + array(['Kibana Airlines', 'Logstash Airways', 'Logstash Airways', + 'Kibana Airlines', 'Kibana Airlines'], dtype=object) """ raise NotImplementedError( "This method would scan/scroll the entire Elasticsearch index(s) into memory." From 9bbe9bbb1c0f68433a2365401303f0603d8572ef Mon Sep 17 00:00:00 2001 From: Stephen Dodson Date: Mon, 25 Nov 2019 16:15:50 +0000 Subject: [PATCH 9/9] Fixing issue with addition for strings e.g. df['currency']+1 --- eland/series.py | 6 +- eland/tests/series/test_arithmetics_pytest.py | 64 ++++++++++++++++++- 2 files changed, 67 insertions(+), 3 deletions(-) diff --git a/eland/series.py b/eland/series.py index 0a3df97..5b69b3f 100644 --- a/eland/series.py +++ b/eland/series.py @@ -1009,7 +1009,7 @@ class Series(NDFrame): series.name = None return series - elif np.issubdtype(np.dtype(type(right)), np.number): # allow np types + elif np.issubdtype(np.dtype(type(right)), np.number) and np.issubdtype(self._dtype, np.number): new_field_name = "{0}_{1}_{2}".format(self.name, method_name, str(right).replace('.', '_')) # Compatible, so create new Series @@ -1021,6 +1021,7 @@ class Series(NDFrame): return series else: + # TODO - support limited ops on strings https://github.com/elastic/eland/issues/65 raise TypeError( "unsupported operand type(s) for '{}' {} '{}'".format(type(self), method_name, type(right)) ) @@ -1033,7 +1034,7 @@ class Series(NDFrame): if isinstance(left, Series): # if both are Series, revese args and call normal op method and remove 'r' from radd etc. return left._numeric_op(self, op_method_name) - elif np.issubdtype(np.dtype(type(left)), np.number): # allow np types + elif np.issubdtype(np.dtype(type(left)), np.number) and np.issubdtype(self._dtype, np.number): # Prefix new field name with 'f_' so it's a valid ES field name new_field_name = "f_{0}_{1}_{2}".format(str(left).replace('.', '_'), op_method_name, self.name) @@ -1046,6 +1047,7 @@ class Series(NDFrame): return series else: + # TODO - support limited ops on strings https://github.com/elastic/eland/issues/65 raise TypeError( "unsupported operand type(s) for '{}' {} '{}'".format(type(self), method_name, type(left)) ) diff --git a/eland/tests/series/test_arithmetics_pytest.py b/eland/tests/series/test_arithmetics_pytest.py index 7a50cc9..c3c0666 100644 --- a/eland/tests/series/test_arithmetics_pytest.py +++ b/eland/tests/series/test_arithmetics_pytest.py @@ -14,7 +14,7 @@ class TestSeriesArithmetics(TestData): # eland / pandas == error with pytest.raises(TypeError): - ed_df['total_quantity'] / pd_df['taxful_total_price'] + ed_series = ed_df['total_quantity'] / pd_df['taxful_total_price'] def test_ecommerce_series_basic_arithmetics(self): pd_df = self.pd_ecommerce().head(100) @@ -97,6 +97,10 @@ class TestSeriesArithmetics(TestData): pd_series = getattr(pd_df['currency'], op)(pd_df['total_quantity']) with pytest.raises(TypeError): ed_series = getattr(ed_df['currency'], op)(ed_df['total_quantity']) + with pytest.raises(TypeError): + pd_series = getattr(pd_df['currency'], op)(1) + with pytest.raises(TypeError): + ed_series = getattr(ed_df['currency'], op)(1) # int op str (throws) for op in non_string_numeric_ops: @@ -140,3 +144,61 @@ class TestSeriesArithmetics(TestData): pd_series = getattr(pd_df['taxful_total_price'], op)(int(6)) ed_series = getattr(ed_df['taxful_total_price'], op)(int(6)) assert_pandas_eland_series_equal(pd_series, ed_series, check_less_precise=True) + + def test_supported_series_dtypes_rops(self): + pd_df = self.pd_ecommerce().head(100) + ed_df = self.ed_ecommerce().head(100) + + # Test some specific operations that are and aren't supported + numeric_ops = ['__radd__', + '__rtruediv__', + '__rfloordiv__', + '__rpow__', + '__rmod__', + '__rmul__', + '__rsub__'] + + non_string_numeric_ops = ['__radd__', + '__rtruediv__', + '__rfloordiv__', + '__rpow__', + '__rmod__', + '__rsub__'] + # __rmul__ is supported for int * str in pandas + + # float op float + for op in numeric_ops: + pd_series = getattr(pd_df['taxful_total_price'], op)(pd_df['taxless_total_price']) + ed_series = getattr(ed_df['taxful_total_price'], op)(ed_df['taxless_total_price']) + assert_pandas_eland_series_equal(pd_series, ed_series, check_less_precise=True) + + # int op float + for op in numeric_ops: + pd_series = getattr(pd_df['total_quantity'], op)(pd_df['taxless_total_price']) + ed_series = getattr(ed_df['total_quantity'], op)(ed_df['taxless_total_price']) + assert_pandas_eland_series_equal(pd_series, ed_series, check_less_precise=True) + + # float op int + for op in numeric_ops: + pd_series = getattr(pd_df['taxful_total_price'], op)(pd_df['total_quantity']) + ed_series = getattr(ed_df['taxful_total_price'], op)(ed_df['total_quantity']) + assert_pandas_eland_series_equal(pd_series, ed_series, check_less_precise=True) + + # str op int (throws) + for op in non_string_numeric_ops: + print(op) + with pytest.raises(TypeError): + pd_series = getattr(pd_df['currency'], op)(pd_df['total_quantity']) + with pytest.raises(TypeError): + ed_series = getattr(ed_df['currency'], op)(ed_df['total_quantity']) + with pytest.raises(TypeError): + pd_series = getattr(pd_df['currency'], op)(10.0) + with pytest.raises(TypeError): + ed_series = getattr(ed_df['currency'], op)(10.0) + + # int op str (throws) + for op in non_string_numeric_ops: + with pytest.raises(TypeError): + pd_series = getattr(pd_df['total_quantity'], op)(pd_df['currency']) + with pytest.raises(TypeError): + ed_series = getattr(ed_df['total_quantity'], op)(ed_df['currency'])