diff --git a/eland/dataframe.py b/eland/dataframe.py index 98d6dab..852e06a 100644 --- a/eland/dataframe.py +++ b/eland/dataframe.py @@ -43,7 +43,7 @@ class DataFrame(NDFrame): - elasticsearch-py instance or - eland.Client instance index_pattern: str - Elasticsearch index pattern (e.g. 'flights' or 'filebeat-\*') + Elasticsearch index pattern (e.g. 'flights' or 'filebeat-*') columns: list of str, optional List of DataFrame columns. A subset of the Elasticsearch index's fields. index_field: str, optional diff --git a/eland/mappings.py b/eland/mappings.py index a4457f9..48140eb 100644 --- a/eland/mappings.py +++ b/eland/mappings.py @@ -182,7 +182,7 @@ class Mappings: """ all_fields_caps_fields = all_fields_caps['fields'] - columns = ['_source', 'es_dtype', 'pd_dtype', 'searchable', 'aggregatable'] + field_names = ['_source', 'es_dtype', 'pd_dtype', 'searchable', 'aggregatable'] capability_matrix = {} for field, field_caps in all_fields_caps_fields.items(): @@ -208,7 +208,7 @@ class Mappings: format(field, vv['non_searchable_indices']), UserWarning) - capability_matrix_df = pd.DataFrame.from_dict(capability_matrix, orient='index', columns=columns) + capability_matrix_df = pd.DataFrame.from_dict(capability_matrix, orient='index', columns=field_names) return capability_matrix_df.sort_index() @@ -325,14 +325,14 @@ class Mappings: mappings = {} mappings['properties'] = {} - for column_name, dtype in dataframe.dtypes.iteritems(): - if geo_points is not None and column_name in geo_points: + for field_name_name, dtype in dataframe.dtypes.iteritems(): + if geo_points is not None and field_name_name in geo_points: es_dtype = 'geo_point' else: es_dtype = Mappings._pd_dtype_to_es_dtype(dtype) - mappings['properties'][column_name] = {} - mappings['properties'][column_name]['type'] = es_dtype + mappings['properties'][field_name_name] = {} + mappings['properties'][field_name_name]['type'] = es_dtype return {"mappings": mappings} @@ -407,12 +407,12 @@ class Mappings: return is_source_field - def aggregatable_columns(self, columns=None): + def aggregatable_field_names(self, field_names=None): """ - Return a dict of aggregatable columns from all columns or columns list + Return a dict of aggregatable field_names from all field_names or field_names list {'customer_full_name': 'customer_full_name.keyword', ...} - Logic here is that column names are '_source' fields and keyword fields + Logic here is that field_name names are '_source' fields and keyword fields may be nested beneath the field. E.g. customer_full_name: text customer_full_name.keyword: keyword @@ -424,28 +424,28 @@ class Mappings: dict e.g. {'customer_full_name': 'customer_full_name.keyword', ...} """ - if columns is None: - columns = self.source_fields() + if field_names is None: + field_names = self.source_fields() aggregatables = {} - for column in columns: - capabilities = self.field_capabilities(column) + for field_name in field_names: + capabilities = self.field_capabilities(field_name) if capabilities['aggregatable']: - aggregatables[column] = column + aggregatables[field_name] = field_name else: - # Try 'column.keyword' - column_keyword = column + '.keyword' - capabilities = self.field_capabilities(column_keyword) + # Try 'field_name.keyword' + field_name_keyword = field_name + '.keyword' + capabilities = self.field_capabilities(field_name_keyword) if capabilities['aggregatable']: - aggregatables[column_keyword] = column + aggregatables[field_name_keyword] = field_name else: # Aggregations not supported for this field - raise ValueError("Aggregations not supported for ", column) + raise ValueError("Aggregations not supported for ", field_name) return aggregatables - def numeric_source_fields(self, columns, include_bool=True): + def numeric_source_fields(self, field_names, include_bool=True): """ Returns ------- @@ -461,10 +461,10 @@ class Mappings: df = self._mappings_capabilities[(self._mappings_capabilities._source == True) & ((self._mappings_capabilities.pd_dtype == 'int64') | (self._mappings_capabilities.pd_dtype == 'float64'))] - # if columns exists, filter index with columns - if columns is not None: - # reindex adds NA for non-existing columns (non-numeric), so drop these after reindex - df = df.reindex(columns) + # if field_names exists, filter index with field_names + if field_names is not None: + # reindex adds NA for non-existing field_names (non-numeric), so drop these after reindex + df = df.reindex(field_names) df.dropna(inplace=True) # return as list @@ -488,16 +488,16 @@ class Mappings: """ return len(self.source_fields()) - def dtypes(self, columns=None): + def dtypes(self, field_names=None): """ Returns ------- dtypes: pd.Series Source field name + pd_dtype """ - if columns is not None: + if field_names is not None: return pd.Series( - {key: self._source_field_pd_dtypes[key] for key in columns}) + {key: self._source_field_pd_dtypes[key] for key in field_names}) return pd.Series(self._source_field_pd_dtypes) diff --git a/eland/ndframe.py b/eland/ndframe.py index 31a2c40..c98dd22 100644 --- a/eland/ndframe.py +++ b/eland/ndframe.py @@ -49,9 +49,7 @@ class NDFrame: A reference to a Elasticsearch python client """ if query_compiler is None: - query_compiler = ElandQueryCompiler(client=client, - index_pattern=index_pattern, - columns=columns, + query_compiler = ElandQueryCompiler(client=client, index_pattern=index_pattern, field_names=columns, index_field=index_field) self._query_compiler = query_compiler diff --git a/eland/operations.py b/eland/operations.py index 7f69446..aa9ba2f 100644 --- a/eland/operations.py +++ b/eland/operations.py @@ -12,7 +12,7 @@ class Operations: A collector of the queries and selectors we apply to queries to return the appropriate results. For example, - - a list of the columns in the DataFrame (a subset of columns in the index) + - a list of the field_names in the DataFrame (a subset of field_names in the index) - a size limit on the results (e.g. for head(n=5)) - a query to filter the results (e.g. df.A > 10) @@ -66,26 +66,34 @@ class Operations: task = ('tail', (index.sort_field, n)) self._tasks.append(task) - def set_columns(self, columns): - # Setting columns at different phases of the task list may result in different - # operations. So instead of setting columns once, set when it happens in call chain - if type(columns) is not list: - columns = list(columns) + def arithmetic_op_fields(self, field_name, op_name, left_field, right_field): + task = ('arithmetic_op_fields', (field_name, (op_name, (left_field, right_field)))) + + # Set this as a column we want to retrieve + self.set_field_names([field_name]) - # TODO - column renaming - # TODO - validate we are setting columns to a subset of last columns? - task = ('columns', columns) self._tasks.append(task) - # Iterate backwards through task list looking for last 'columns' task + + def set_field_names(self, field_names): + # Setting field_names at different phases of the task list may result in different + # operations. So instead of setting field_names once, set when it happens in call chain + if type(field_names) is not list: + field_names = list(field_names) + + # TODO - field_name renaming + # TODO - validate we are setting field_names to a subset of last field_names? + task = ('field_names', field_names) + self._tasks.append(task) + # Iterate backwards through task list looking for last 'field_names' task for task in reversed(self._tasks): - if task[0] == 'columns': + if task[0] == 'field_names': return task[1] return None - def get_columns(self): - # Iterate backwards through task list looking for last 'columns' task + def get_field_names(self): + # Iterate backwards through task list looking for last 'field_names' task for task in reversed(self._tasks): - if task[0] == 'columns': + if task[0] == 'field_names': return task[1] return None @@ -103,8 +111,8 @@ class Operations: "not supported {0} {1}" .format(query_params, post_processing)) - # Only return requested columns - fields = query_compiler.columns + # Only return requested field_names + fields = query_compiler.field_names counts = {} for field in fields: @@ -143,13 +151,13 @@ class Operations: Parameters ---------- field_types: str, default None - if `aggregatable` use only columns whose fields in elasticseach are aggregatable. + if `aggregatable` use only field_names whose fields in elasticseach are aggregatable. If `None`, use only numeric fields. Returns ------- pandas.Series - Series containing results of `func` applied to the column(s) + Series containing results of `func` applied to the field_name(s) """ query_params, post_processing = self._resolve_tasks() @@ -157,7 +165,7 @@ class Operations: if size is not None: raise NotImplementedError("Can not count field matches if size is set {}".format(size)) - columns = self.get_columns() + field_names = self.get_field_names() body = Query(query_params['query']) @@ -165,9 +173,9 @@ class Operations: # therefore we include an optional all parameter on operations # that call _metric_aggs if field_types=='aggregatable': - source_fields = query_compiler._mappings.aggregatable_columns(columns) + source_fields = query_compiler._mappings.aggregatable_field_names(field_names) else: - source_fields = query_compiler._mappings.numeric_source_fields(columns) + source_fields = query_compiler._mappings.numeric_source_fields(field_names) for field in source_fields: body.metric_aggs(field, func, field) @@ -209,7 +217,7 @@ class Operations: Returns ------- pandas.Series - Series containing results of `func` applied to the column(s) + Series containing results of `func` applied to the field_name(s) """ query_params, post_processing = self._resolve_tasks() @@ -217,14 +225,14 @@ class Operations: if size is not None: raise NotImplementedError("Can not count field matches if size is set {}".format(size)) - columns = self.get_columns() + field_names = self.get_field_names() - # Get just aggregatable columns - aggregatable_columns = query_compiler._mappings.aggregatable_columns(columns) + # Get just aggregatable field_names + aggregatable_field_names = query_compiler._mappings.aggregatable_field_names(field_names) body = Query(query_params['query']) - for field in aggregatable_columns.keys(): + for field in aggregatable_field_names.keys(): body.terms_aggs(field, func, field, es_size=es_size) response = query_compiler._client.search( @@ -234,12 +242,12 @@ class Operations: results = {} - for key, value in aggregatable_columns.items(): - for bucket in response['aggregations'][columns[0]]['buckets']: + for key, value in aggregatable_field_names.items(): + for bucket in response['aggregations'][field_names[0]]['buckets']: results[bucket['key']] = bucket['doc_count'] try: - name = columns[0] + name = field_names[0] except IndexError: name = None @@ -248,16 +256,16 @@ class Operations: return s def _hist_aggs(self, query_compiler, num_bins): - # Get histogram bins and weights for numeric columns + # Get histogram bins and weights for numeric field_names query_params, post_processing = self._resolve_tasks() size = self._size(query_params, post_processing) if size is not None: raise NotImplementedError("Can not count field matches if size is set {}".format(size)) - columns = self.get_columns() + field_names = self.get_field_names() - numeric_source_fields = query_compiler._mappings.numeric_source_fields(columns) + numeric_source_fields = query_compiler._mappings.numeric_source_fields(field_names) body = Query(query_params['query']) @@ -331,7 +339,7 @@ class Operations: Pandas supports a lot of options here, and these options generally work on text and numerics in pandas. Elasticsearch has metric aggs and terms aggs so will have different behaviour. - Pandas aggs that return columns (as opposed to transformed rows): + Pandas aggs that return field_names (as opposed to transformed rows): all any @@ -398,14 +406,14 @@ class Operations: if size is not None: raise NotImplementedError("Can not count field matches if size is set {}".format(size)) - columns = self.get_columns() + field_names = self.get_field_names() body = Query(query_params['query']) # convert pandas aggs to ES equivalent es_aggs = self._map_pd_aggs_to_es_aggs(pd_aggs) - for field in columns: + for field in field_names: for es_agg in es_aggs: # If we have multiple 'extended_stats' etc. here we simply NOOP on 2nd call if isinstance(es_agg, tuple): @@ -427,7 +435,7 @@ class Operations: """ results = {} - for field in columns: + for field in field_names: values = list() for es_agg in es_aggs: if isinstance(es_agg, tuple): @@ -448,9 +456,9 @@ class Operations: if size is not None: raise NotImplementedError("Can not count field matches if size is set {}".format(size)) - columns = self.get_columns() + field_names = self.get_field_names() - numeric_source_fields = query_compiler._mappings.numeric_source_fields(columns, include_bool=False) + numeric_source_fields = query_compiler._mappings.numeric_source_fields(field_names, include_bool=False) # for each field we compute: # count, mean, std, min, 25%, 50%, 75%, max @@ -535,10 +543,15 @@ class Operations: size, sort_params = Operations._query_params_to_size_and_sort(query_params) - body = Query(query_params['query']) + script_fields = query_params['query_script_fields'] + query = Query(query_params['query']) - # Only return requested columns - columns = self.get_columns() + body = query.to_search_body() + if script_fields is not None: + body['script_fields'] = script_fields + + # Only return requested field_names + field_names = self.get_field_names() es_results = None @@ -551,14 +564,14 @@ class Operations: index=query_compiler._index_pattern, size=size, sort=sort_params, - body=body.to_search_body(), - _source=columns) + body=body, + _source=field_names) else: is_scan = True es_results = query_compiler._client.scan( index=query_compiler._index_pattern, - query=body.to_search_body(), - _source=columns) + query=body, + _source=field_names) # create post sort if sort_params is not None: post_processing.append(self._sort_params_to_postprocessing(sort_params)) @@ -575,9 +588,9 @@ class Operations: df = self._apply_df_post_processing(df, post_processing) collector.collect(df) - def iloc(self, index, columns): - # index and columns are indexers - task = ('iloc', (index, columns)) + def iloc(self, index, field_names): + # index and field_names are indexers + task = ('iloc', (index, field_names)) self._tasks.append(task) def index_count(self, query_compiler, field): @@ -691,13 +704,13 @@ class Operations: df = df.sort_values(sort_field, False) elif action[0] == 'iloc': index_indexer = action[1][0] - column_indexer = action[1][1] + field_name_indexer = action[1][1] if index_indexer is None: index_indexer = slice(None) - if column_indexer is None: - column_indexer = slice(None) - df = df.iloc[index_indexer, column_indexer] - # columns could be in here (and we ignore it) + if field_name_indexer is None: + field_name_indexer = slice(None) + df = df.iloc[index_indexer, field_name_indexer] + # field_names could be in here (and we ignore it) return df @@ -710,6 +723,7 @@ class Operations: "query_sort_order": None, "query_size": None, "query_fields": None, + "query_script_fields": None, "query": Query()} post_processing = [] @@ -727,6 +741,8 @@ class Operations: query_params, post_processing = self._resolve_query_terms(task, query_params, post_processing) elif task[0] == 'boolean_filter': query_params, post_processing = self._resolve_boolean_filter(task, query_params, post_processing) + elif task[0] == 'arithmetic_op_fields': + query_params, post_processing = self._resolve_arithmetic_op_fields(task, query_params, post_processing) else: # a lot of operations simply post-process the dataframe - put these straight through query_params, post_processing = self._resolve_post_processing_task(task, query_params, post_processing) @@ -858,9 +874,44 @@ class Operations: return query_params, post_processing + def _resolve_arithmetic_op_fields(self, item, query_params, post_processing): + # task = ('arithmetic_op_fields', (field_name, (op_name, (left_field, right_field)))) + field_name = item[1][0] + op_name = item[1][1][0] + left_field = item[1][1][1][0] + right_field = item[1][1][1][1] + + """ + (if op_name = 'truediv') + + "script_fields": { + "field_name": { + "script": { + "source": "doc[left_field].value / doc[right_field].value" + } + } + } + """ + if op_name == 'truediv': + op = '/' + else: + raise NotImplementedError("Not implemented operation '{0}'".format(op_name)) + + source = "doc['{0}'].value {1} doc['{2}'].value".format(left_field, op, right_field) + + if query_params['query_script_fields'] is None: + query_params['query_script_fields'] = {} + query_params['query_script_fields'][field_name] = { + 'script': { + 'source': source + } + } + + return query_params, post_processing + def _resolve_post_processing_task(self, item, query_params, post_processing): # Just do this in post-processing - if item[0] != 'columns': + if item[0] != 'field_names': post_processing.append(item) return query_params, post_processing @@ -885,11 +936,11 @@ class Operations: query_params, post_processing = self._resolve_tasks() size, sort_params = Operations._query_params_to_size_and_sort(query_params) - columns = self.get_columns() + field_names = self.get_field_names() buf.write(" size: {0}\n".format(size)) buf.write(" sort_params: {0}\n".format(sort_params)) - buf.write(" columns: {0}\n".format(columns)) + buf.write(" field_names: {0}\n".format(field_names)) buf.write(" post_processing: {0}\n".format(post_processing)) def update_query(self, boolean_filter): diff --git a/eland/query.py b/eland/query.py index 80a5161..72e9129 100644 --- a/eland/query.py +++ b/eland/query.py @@ -15,10 +15,12 @@ class Query: def __init__(self, query=None): if query is None: self._query = BooleanFilter() + self._script_fields = {} self._aggs = {} else: # Deep copy the incoming query so we can change it self._query = deepcopy(query._query) + self._script_fields = deepcopy(query._script_fields) self._aggs = deepcopy(query._aggs) def exists(self, field, must=True): @@ -157,5 +159,14 @@ class Query: else: self._query = self._query & boolean_filter + def arithmetic_op_fields(self, op_name, left_field, right_field): + if self._script_fields.empty(): + body = None + else: + body = {"query": self._script_fields.build()} + + return body + def __repr__(self): return repr(self.to_search_body()) + diff --git a/eland/query_compiler.py b/eland/query_compiler.py index 102221f..e94e605 100644 --- a/eland/query_compiler.py +++ b/eland/query_compiler.py @@ -38,12 +38,8 @@ class ElandQueryCompiler: A way to mitigate this would be to post process this drop - TODO """ - def __init__(self, - client=None, - index_pattern=None, - columns=None, - index_field=None, - operations=None): + def __init__(self, client=None, index_pattern=None, field_names=None, index_field=None, operations=None, + name_mapper=None): self._client = Client(client) self._index_pattern = index_pattern @@ -58,29 +54,53 @@ class ElandQueryCompiler: else: self._operations = operations - if columns is not None: - self.columns = columns + if field_names is not None: + self.field_names = field_names + + if name_mapper is None: + self._name_mapper = ElandQueryCompiler.DisplayNameToFieldNameMapper() + else: + self._name_mapper = name_mapper def _get_index(self): return self._index + def _get_field_names(self): + field_names = self._operations.get_field_names() + if field_names is None: + # default to all + field_names = self._mappings.source_fields() + + return pd.Index(field_names) + + def _set_field_names(self, field_names): + self._operations.set_field_names(field_names) + + field_names = property(_get_field_names, _set_field_names) + def _get_columns(self): - columns = self._operations.get_columns() + columns = self._operations.get_field_names() if columns is None: # default to all columns = self._mappings.source_fields() + # map renames + columns = self._name_mapper.field_to_display_names(columns) + return pd.Index(columns) def _set_columns(self, columns): - self._operations.set_columns(columns) + # map renames + columns = self._name_mapper.display_to_field_names(columns) + + self._operations.set_field_names(columns) columns = property(_get_columns, _set_columns) index = property(_get_index) @property def dtypes(self): - columns = self._operations.get_columns() + columns = self._operations.get_field_names() return self._mappings.dtypes(columns) @@ -194,6 +214,12 @@ class ElandQueryCompiler: row = hit['_source'] + # script_fields appear in 'fields' + if 'fields' in hit: + fields = hit['fields'] + for key, value in fields.items(): + row[key] = value + # get index value - can be _id or can be field value in source if self._index.is_source_field: index_field = row[self._index.index_field] @@ -221,6 +247,10 @@ class ElandQueryCompiler: is_source_field, pd_dtype = self._mappings.source_field_pd_dtype(missing) df[missing] = pd.Series(dtype=pd_dtype) + # Rename columns + if not self._name_mapper.empty: + df.rename(columns=self._name_mapper.display_names_mapper(), inplace=True) + # Sort columns in mapping order df = df[self.columns] @@ -267,6 +297,8 @@ class ElandQueryCompiler: out[field_name].append(x) else: out[field_name] = x + else: + out[name[:-1]] = x flatten(y) @@ -307,13 +339,16 @@ class ElandQueryCompiler: return df def copy(self): - return ElandQueryCompiler( - client=self._client, - index_pattern=self._index_pattern, - columns=None, # columns are embedded in operations - index_field=self._index.index_field, - operations=self._operations.copy() - ) + return ElandQueryCompiler(client=self._client, index_pattern=self._index_pattern, field_names=None, + index_field=self._index.index_field, operations=self._operations.copy(), + name_mapper=self._name_mapper.copy()) + + def rename(self, renames): + result = self.copy() + + result._name_mapper.rename_display_name(renames) + + return result def head(self, n): result = self.copy() @@ -364,14 +399,7 @@ class ElandQueryCompiler: if numeric: raise NotImplementedError("Not implemented yet...") - result._operations.set_columns(list(key)) - - return result - - def view(self, index=None, columns=None): - result = self.copy() - - result._operations.iloc(index, columns) + result._operations.set_field_names(list(key)) return result @@ -382,7 +410,7 @@ class ElandQueryCompiler: if columns is not None: # columns is a pandas.Index so we can use pandas drop feature new_columns = self.columns.drop(columns) - result._operations.set_columns(new_columns.to_list()) + result._operations.set_field_names(new_columns.to_list()) if index is not None: result._operations.drop_index_values(self, self.index.index_field, index) @@ -433,3 +461,141 @@ class ElandQueryCompiler: return result + def check_arithmetics(self, right): + """ + Compare 2 query_compilers to see if arithmetic operations can be performed by the NDFrame object. + + This does very basic comparisons and ignores some of the complexities of incompatible task lists + + Raises exception if incompatible + + Parameters + ---------- + right: ElandQueryCompiler + The query compiler to compare self to + + Raises + ------ + TypeError, ValueError + If arithmetic operations aren't possible + """ + if not isinstance(right, ElandQueryCompiler): + raise TypeError( + "Incompatible types " + "{0} != {1}".format(type(self), type(right)) + ) + + if self._client._es != right._client._es: + raise ValueError( + "Can not perform arithmetic operations across different clients" + "{0} != {1}".format(self._client._es, right._client._es) + ) + + if self._index.index_field != right._index.index_field: + raise ValueError( + "Can not perform arithmetic operations across different index fields " + "{0} != {1}".format(self._index.index_field, right._index.index_field) + ) + + if self._index_pattern != right._index_pattern: + raise ValueError( + "Can not perform arithmetic operations across different index patterns" + "{0} != {1}".format(self._index_pattern, right._index_pattern) + ) + + def arithmetic_op_fields(self, field_name, op, left_field, right_field): + result = self.copy() + + result._operations.arithmetic_op_fields(field_name, op, left_field, right_field) + + return result + + """ + Internal class to deal with column renaming and script_fields + """ + class DisplayNameToFieldNameMapper: + def __init__(self, + field_to_display_names=None, + display_to_field_names=None): + + if field_to_display_names is not None: + self._field_to_display_names = field_to_display_names + else: + self._field_to_display_names = dict() + + if display_to_field_names is not None: + self._display_to_field_names = display_to_field_names + else: + self._display_to_field_names = dict() + + def rename_display_name(self, renames): + for current_display_name, new_display_name in renames.items(): + if current_display_name in self._display_to_field_names: + # has been renamed already - update name + field_name = self._display_to_field_names[current_display_name] + del self._display_to_field_names[current_display_name] + del self._field_to_display_names[field_name] + self._display_to_field_names[new_display_name] = field_name + self._field_to_display_names[field_name] = new_display_name + else: + # new rename - assume 'current_display_name' is 'field_name' + field_name = current_display_name + + # if field_name is already mapped ignore + if field_name not in self._field_to_display_names: + self._display_to_field_names[new_display_name] = field_name + self._field_to_display_names[field_name] = new_display_name + + def field_names_to_list(self): + return self._field_to_display_names.keys() + + def display_names_to_list(self): + return self._display_to_field_names.keys() + + # Return mapper values as dict + def display_names_mapper(self): + return self._field_to_display_names + + @property + def empty(self): + return not self._display_to_field_names + + def field_to_display_names(self, field_names): + if self.empty: + return field_names + + display_names = [] + + for field_name in field_names: + if field_name in self._field_to_display_names: + display_name = self._field_to_display_names[field_name] + else: + display_name = field_name + display_names.append(display_name) + + return display_names + + def display_to_field_names(self, display_names): + if self.empty: + return display_names + + field_names = [] + + for display_name in display_names: + if display_name in self._display_to_field_names: + field_name = self._display_to_field_names[display_name] + else: + field_name = display_name + field_names.append(field_name) + + return field_names + + def __constructor__(self, *args, **kwargs): + return type(self)(*args, **kwargs) + + def copy(self): + return self.__constructor__( + field_to_display_names=self._field_to_display_names, + display_to_field_names = self._display_to_field_names + ) + diff --git a/eland/series.py b/eland/series.py index 6318028..9c96003 100644 --- a/eland/series.py +++ b/eland/series.py @@ -15,7 +15,7 @@ Based on NDFrame which underpins eland.1DataFrame """ -import warnings +from io import StringIO import pandas as pd @@ -98,6 +98,20 @@ class Series(NDFrame): name = property(_get_name) + def rename(self, new_name): + """ + ONLY COLUMN rename supported + + Parameters + ---------- + new_name + + Returns + ------- + + """ + return Series(query_compiler=self._query_compiler.rename({self.name: new_name})) + def head(self, n=5): return Series(query_compiler=self._query_compiler.head(n)) @@ -141,7 +155,7 @@ class Series(NDFrame): """ if not isinstance(es_size, int): raise TypeError("es_size must be a positive integer.") - if not es_size>0: + if not es_size > 0: raise ValueError("es_size must be a positive integer.") return self._query_compiler.value_counts(es_size) @@ -276,3 +290,35 @@ class Series(NDFrame): """ return 1 + + def info_es(self): + buf = StringIO() + + super()._info_es(buf) + + return buf.getvalue() + + def __truediv__(self, right): + return self.truediv(right) + + def truediv(self, right): + """ + return a / b + + a & b == Series + a & b must share same eland.Client, index_pattern and index_field + """ + if isinstance(right, Series): + # Check compatibility + self._query_compiler.check_arithmetics(right._query_compiler) + + field_name = "{0}_{1}_{2}".format(self.name, "truediv", right.name) + + # Compatible, so create new Series + return Series(query_compiler=self._query_compiler.arithmetic_op_fields( + field_name, 'truediv', self.name, right.name)) + else: + raise TypeError( + "Can only perform arithmetic operation on selected types " + "{0} != {1}".format(type(self), type(right)) + ) diff --git a/eland/tests/client/__init__.py b/eland/tests/client/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/eland/tests/client/test_eq_pytest.py b/eland/tests/client/test_eq_pytest.py new file mode 100644 index 0000000..332d6f4 --- /dev/null +++ b/eland/tests/client/test_eq_pytest.py @@ -0,0 +1,28 @@ +# File called _pytest for PyCharm compatability +from elasticsearch import Elasticsearch + +import eland as ed +from eland.tests.common import TestData + +import pytest + + +class TestClientEq(TestData): + + def test_self_eq(self): + es = Elasticsearch('localhost') + + client = ed.Client(es) + + assert client != es + + assert client == client + + def test_non_self_ne(self): + es1 = Elasticsearch('localhost') + es2 = Elasticsearch('localhost') + + client1 = ed.Client(es1) + client2 = ed.Client(es2) + + assert client1 != client2 diff --git a/eland/tests/common.py b/eland/tests/common.py index 107b5ab..6549d73 100644 --- a/eland/tests/common.py +++ b/eland/tests/common.py @@ -80,7 +80,7 @@ def assert_eland_frame_equal(left, right): assert_frame_equal(left._to_pandas(), right._to_pandas()) -def assert_pandas_eland_series_equal(left, right): +def assert_pandas_eland_series_equal(left, right, check_less_precise=False): if not isinstance(left, pd.Series): raise AssertionError("Expected type {exp_type}, found {act_type} instead".format( exp_type='pd.Series', act_type=type(left))) @@ -90,4 +90,4 @@ def assert_pandas_eland_series_equal(left, right): exp_type='ed.Series', act_type=type(right))) # Use pandas tests to check similarity - assert_series_equal(left, right._to_pandas()) + assert_series_equal(left, right._to_pandas(), check_less_precise=check_less_precise) diff --git a/eland/tests/mappings/test_aggregatables_pytest.py b/eland/tests/mappings/test_aggregatables_pytest.py index 8d12f17..9d27ba7 100644 --- a/eland/tests/mappings/test_aggregatables_pytest.py +++ b/eland/tests/mappings/test_aggregatables_pytest.py @@ -8,7 +8,7 @@ class TestMappingsAggregatables(TestData): def test_ecommerce_all_aggregatables(self): ed_ecommerce = self.ed_ecommerce() - aggregatables = ed_ecommerce._query_compiler._mappings.aggregatable_columns() + aggregatables = ed_ecommerce._query_compiler._mappings.aggregatable_field_names() expected = {'category.keyword': 'category', 'currency': 'currency', @@ -67,6 +67,6 @@ class TestMappingsAggregatables(TestData): 'customer_first_name.keyword': 'customer_first_name', 'type': 'type', 'user': 'user'} - aggregatables = ed_ecommerce._query_compiler._mappings.aggregatable_columns(expected.values()) + aggregatables = ed_ecommerce._query_compiler._mappings.aggregatable_field_names(expected.values()) assert expected == aggregatables diff --git a/eland/tests/mappings/test_dtypes_pytest.py b/eland/tests/mappings/test_dtypes_pytest.py index 43d3e3e..e467e9a 100644 --- a/eland/tests/mappings/test_dtypes_pytest.py +++ b/eland/tests/mappings/test_dtypes_pytest.py @@ -21,6 +21,6 @@ class TestMappingsDtypes(TestData): pd_flights = self.pd_flights()[['Carrier', 'AvgTicketPrice', 'Cancelled']] pd_dtypes = pd_flights.dtypes - ed_dtypes = ed_flights._query_compiler._mappings.dtypes(columns=['Carrier', 'AvgTicketPrice', 'Cancelled']) + ed_dtypes = ed_flights._query_compiler._mappings.dtypes(field_names=['Carrier', 'AvgTicketPrice', 'Cancelled']) assert_series_equal(pd_dtypes, ed_dtypes) diff --git a/eland/tests/mappings/test_numeric_source_fields_pytest.py b/eland/tests/mappings/test_numeric_source_fields_pytest.py index 9611a1f..a63e94d 100644 --- a/eland/tests/mappings/test_numeric_source_fields_pytest.py +++ b/eland/tests/mappings/test_numeric_source_fields_pytest.py @@ -13,13 +13,13 @@ class TestMappingsNumericSourceFields(TestData): ed_flights = self.ed_flights() pd_flights = self.pd_flights() - ed_numeric = ed_flights._query_compiler._mappings.numeric_source_fields(columns=None, include_bool=False) + ed_numeric = ed_flights._query_compiler._mappings.numeric_source_fields(field_names=None, include_bool=False) pd_numeric = pd_flights.select_dtypes(include=np.number) assert pd_numeric.columns.to_list() == ed_numeric def test_ecommerce_selected_non_numeric_source_fields(self): - columns = ['category', 'currency', 'customer_birth_date', 'customer_first_name', 'user'] + field_names = ['category', 'currency', 'customer_birth_date', 'customer_first_name', 'user'] """ Note: non of there are numeric category object @@ -29,16 +29,16 @@ class TestMappingsNumericSourceFields(TestData): user object """ - ed_ecommerce = self.ed_ecommerce()[columns] - pd_ecommerce = self.pd_ecommerce()[columns] + ed_ecommerce = self.ed_ecommerce()[field_names] + pd_ecommerce = self.pd_ecommerce()[field_names] - ed_numeric = ed_ecommerce._query_compiler._mappings.numeric_source_fields(columns=columns, include_bool=False) + ed_numeric = ed_ecommerce._query_compiler._mappings.numeric_source_fields(field_names=field_names, include_bool=False) pd_numeric = pd_ecommerce.select_dtypes(include=np.number) assert pd_numeric.columns.to_list() == ed_numeric def test_ecommerce_selected_mixed_numeric_source_fields(self): - columns = ['category', 'currency', 'customer_birth_date', 'customer_first_name', 'total_quantity', 'user'] + field_names = ['category', 'currency', 'customer_birth_date', 'customer_first_name', 'total_quantity', 'user'] """ Note: one is numeric @@ -50,16 +50,16 @@ class TestMappingsNumericSourceFields(TestData): user object """ - ed_ecommerce = self.ed_ecommerce()[columns] - pd_ecommerce = self.pd_ecommerce()[columns] + ed_ecommerce = self.ed_ecommerce()[field_names] + pd_ecommerce = self.pd_ecommerce()[field_names] - ed_numeric = ed_ecommerce._query_compiler._mappings.numeric_source_fields(columns=columns, include_bool=False) + ed_numeric = ed_ecommerce._query_compiler._mappings.numeric_source_fields(field_names=field_names, include_bool=False) pd_numeric = pd_ecommerce.select_dtypes(include=np.number) assert pd_numeric.columns.to_list() == ed_numeric def test_ecommerce_selected_all_numeric_source_fields(self): - columns = ['total_quantity', 'taxful_total_price', 'taxless_total_price'] + field_names = ['total_quantity', 'taxful_total_price', 'taxless_total_price'] """ Note: all are numeric @@ -68,10 +68,10 @@ class TestMappingsNumericSourceFields(TestData): taxless_total_price float64 """ - ed_ecommerce = self.ed_ecommerce()[columns] - pd_ecommerce = self.pd_ecommerce()[columns] + ed_ecommerce = self.ed_ecommerce()[field_names] + pd_ecommerce = self.pd_ecommerce()[field_names] - ed_numeric = ed_ecommerce._query_compiler._mappings.numeric_source_fields(columns=columns, include_bool=False) + ed_numeric = ed_ecommerce._query_compiler._mappings.numeric_source_fields(field_names=field_names, include_bool=False) pd_numeric = pd_ecommerce.select_dtypes(include=np.number) assert pd_numeric.columns.to_list() == ed_numeric diff --git a/eland/tests/query_compiler/test_rename_pytest.py b/eland/tests/query_compiler/test_rename_pytest.py new file mode 100644 index 0000000..3948044 --- /dev/null +++ b/eland/tests/query_compiler/test_rename_pytest.py @@ -0,0 +1,75 @@ +# File called _pytest for PyCharm compatability +import pandas as pd + +from pandas.util.testing import assert_series_equal + +from eland import ElandQueryCompiler +from eland.tests.common import TestData + + +class TestQueryCompilerRename(TestData): + + def test_query_compiler_basic_rename(self): + field_names = [] + display_names = [] + + mapper = ElandQueryCompiler.DisplayNameToFieldNameMapper() + + assert field_names == mapper.field_names_to_list() + assert display_names == mapper.display_names_to_list() + + field_names = ['a'] + display_names = ['A'] + update_A = {'a' : 'A'} + mapper.rename_display_name(update_A) + + assert field_names == mapper.field_names_to_list() + assert display_names == mapper.display_names_to_list() + + field_names = ['a', 'b'] + display_names = ['A', 'B'] + + update_B = {'b' : 'B'} + mapper.rename_display_name(update_B) + + assert field_names == mapper.field_names_to_list() + assert display_names == mapper.display_names_to_list() + + field_names = ['a', 'b'] + display_names = ['AA', 'B'] + + update_AA = {'A' : 'AA'} + mapper.rename_display_name(update_AA) + + assert field_names == mapper.field_names_to_list() + assert display_names == mapper.display_names_to_list() + + def test_query_compiler_basic_rename_columns(self): + columns = ['a', 'b', 'c', 'd'] + + mapper = ElandQueryCompiler.DisplayNameToFieldNameMapper() + + display_names = ['A', 'b', 'c', 'd'] + update_A = {'a' : 'A'} + mapper.rename_display_name(update_A) + + assert display_names == mapper.display_names(columns) + + # Invalid update + display_names = ['A', 'b', 'c', 'd'] + update_ZZ = {'a' : 'ZZ'} + mapper.rename_display_name(update_ZZ) + + assert display_names == mapper.display_names(columns) + + display_names = ['AA', 'b', 'c', 'd'] + update_AA = {'A' : 'AA'} # already renamed to 'A' + mapper.rename_display_name(update_AA) + + assert display_names == mapper.display_names(columns) + + display_names = ['AA', 'b', 'C', 'd'] + update_AA_C = {'a' : 'AA', 'c' : 'C'} # 'a' rename ignored + mapper.rename_display_name(update_AA_C) + + assert display_names == mapper.display_names(columns) diff --git a/eland/tests/series/test_arithmetics_pytest.py b/eland/tests/series/test_arithmetics_pytest.py new file mode 100644 index 0000000..589853c --- /dev/null +++ b/eland/tests/series/test_arithmetics_pytest.py @@ -0,0 +1,50 @@ +# File called _pytest for PyCharm compatability +import eland as ed +from eland.tests.common import TestData, assert_pandas_eland_series_equal +from pandas.util.testing import assert_series_equal +import pytest + + +class TestSeriesArithmetics(TestData): + + def test_ecommerce_series_invalid_div(self): + pd_df = self.pd_ecommerce() + ed_df = self.ed_ecommerce() + + # eland / pandas == error + with pytest.raises(TypeError): + ed_df['total_quantity'] / pd_df['taxful_total_price'] + + def test_ecommerce_series_div(self): + pd_df = self.pd_ecommerce() + ed_df = self.ed_ecommerce() + + pd_avg_price = pd_df['total_quantity'] / pd_df['taxful_total_price'] + print(pd_avg_price) # this has None as name + + ed_avg_price = ed_df['total_quantity'] / ed_df['taxful_total_price'] + print(ed_avg_price) + + assert_pandas_eland_series_equal(pd_avg_price, ed_avg_price, check_less_precise=True) + + def test_ecommerce_series_div_float(self): + pd_df = self.pd_ecommerce() + ed_df = self.ed_ecommerce() + + pd_avg_price = pd_df['total_quantity'] / 10.0 + print(pd_avg_price) + + ed_avg_price = ed_df['total_quantity'] / 10.0 + print(ed_avg_price) + + def test_ecommerce_series_div_other(self): + ed_df = self.ed_ecommerce() + + ed_s1 = ed_df.total_quantity + ed_s2 = ed_df.taxful_total_price + + print(ed_s1) + print(ed_s2) + + print(ed_s1) + print(ed_s2) diff --git a/eland/tests/series/test_rename_pytest.py b/eland/tests/series/test_rename_pytest.py new file mode 100644 index 0000000..89eb7f7 --- /dev/null +++ b/eland/tests/series/test_rename_pytest.py @@ -0,0 +1,23 @@ +# File called _pytest for PyCharm compatability +import eland as ed +from eland.tests import ELASTICSEARCH_HOST +from eland.tests import FLIGHTS_INDEX_NAME +from eland.tests.common import TestData +from eland.tests.common import assert_pandas_eland_series_equal + + +class TestSeriesRename(TestData): + + def test_rename(self): + pd_carrier = self.pd_flights()['Carrier'] + ed_carrier = ed.Series(ELASTICSEARCH_HOST, FLIGHTS_INDEX_NAME, 'Carrier') + + assert_pandas_eland_series_equal(pd_carrier, ed_carrier) + + pd_renamed = pd_carrier.rename("renamed") + ed_renamed = ed_carrier.rename("renamed") + + assert_pandas_eland_series_equal(pd_renamed, ed_renamed) + + +