From d71ce9f50c534a5d9f059a4353a4b5672d74dc0d Mon Sep 17 00:00:00 2001 From: Stephen Dodson Date: Thu, 11 Jul 2019 10:11:57 +0000 Subject: [PATCH] Adding drop + the ability for operations to have a query Significant refactor - needs cleanup --- eland/__init__.py | 1 + eland/client.py | 11 +- eland/dataframe.py | 155 ++++++- eland/docs/dataframe_supported.rst | 422 ++++++++++++++++++ eland/index.py | 1 + eland/mappings.py | 51 ++- eland/ndframe.py | 131 +++++- eland/operations.py | 388 ++++++++++++---- eland/query.py | 93 ++++ eland/query_compiler.py | 58 ++- eland/tests/common.py | 12 + eland/tests/dataframe/test_count_pytest.py | 14 +- eland/tests/dataframe/test_describe_pytest.py | 40 ++ eland/tests/dataframe/test_drop_pytest.py | 56 +++ eland/tests/dataframe/test_getitem_pytest.py | 16 + eland/tests/dataframe/test_iloc_pytest.py | 22 - eland/tests/dataframe/test_info_pytest.py | 15 +- eland/tests/dataframe/test_metrics_pytest.py | 46 ++ eland/tests/dataframe/test_sum_pytest.py | 20 + eland/tests/mappings/__init__.py | 0 eland/tests/mappings/test_dtypes_pytest.py | 44 ++ eland/tests/query/test_count_pytest.py | 27 ++ eland/utils.py | 8 +- 23 files changed, 1484 insertions(+), 147 deletions(-) create mode 100644 eland/query.py create mode 100644 eland/tests/dataframe/test_describe_pytest.py create mode 100644 eland/tests/dataframe/test_drop_pytest.py create mode 100644 eland/tests/dataframe/test_metrics_pytest.py create mode 100644 eland/tests/dataframe/test_sum_pytest.py create mode 100644 eland/tests/mappings/__init__.py create mode 100644 eland/tests/mappings/test_dtypes_pytest.py create mode 100644 eland/tests/query/test_count_pytest.py diff --git a/eland/__init__.py b/eland/__init__.py index bf309ea..b87a5eb 100644 --- a/eland/__init__.py +++ b/eland/__init__.py @@ -7,6 +7,7 @@ os.environ["MODIN_BACKEND"] = 'pandas' from .client import * from .index import * from .mappings import * +from .query import * from .operations import * from .query_compiler import * from .ndframe import * diff --git a/eland/client.py b/eland/client.py index 5806481..419170b 100644 --- a/eland/client.py +++ b/eland/client.py @@ -12,7 +12,16 @@ class Client: self._es = es._es else: self._es = Elasticsearch(es) - + + def index_create(self, **kwargs): + return self._es.indices.create(**kwargs) + + def index_delete(self, **kwargs): + return self._es.indices.delete(**kwargs) + + def index_exists(self, **kwargs): + return self._es.indices.exists(**kwargs) + def get_mapping(self, **kwargs): return self._es.indices.get_mapping(**kwargs) diff --git a/eland/dataframe.py b/eland/dataframe.py index c9a35c9..ddf3e28 100644 --- a/eland/dataframe.py +++ b/eland/dataframe.py @@ -16,7 +16,7 @@ from eland import Series class DataFrame(NDFrame): - # TODO create effectively 2 constructors + # This is effectively 2 constructors # 1. client, index_pattern, columns, index_field # 2. query_compiler def __init__(self, @@ -74,6 +74,22 @@ class DataFrame(NDFrame): return buf.getvalue() + def count(self): + """ + Count non-NA cells for each column (TODO row) + + Counts are based on exists queries against ES + + This is inefficient, as it creates N queries (N is number of fields). + + An alternative approach is to use value_count aggregations. However, they have issues in that: + 1. They can only be used with aggregatable fields (e.g. keyword not text) + 2. For list fields they return multiple counts. E.g. tags=['elastic', 'ml'] returns value_count=2 + for a single document. + """ + return self._query_compiler.count() + + def info_es(self): buf = StringIO() @@ -81,6 +97,130 @@ class DataFrame(NDFrame): return buf.getvalue() + def _index_summary(self): + head = self.head(1)._to_pandas().index[0] + tail = self.tail(1)._to_pandas().index[0] + index_summary = ', %s to %s' % (pprint_thing(head), + pprint_thing(tail)) + + name = "Index" + return '%s: %s entries%s' % (name, len(self), index_summary) + + def info(self, verbose=None, buf=None, max_cols=None, memory_usage=None, + null_counts=None): + """ + Print a concise summary of a DataFrame. + + This method prints information about a DataFrame including + the index dtype and column dtypes, non-null values and memory usage. + + This copies a lot of code from pandas.DataFrame.info as it is difficult + to split out the appropriate code or creating a SparseDataFrame gives + incorrect results on types and counts. + """ + if buf is None: # pragma: no cover + buf = sys.stdout + + lines = [] + + lines.append(str(type(self))) + lines.append(self._index_summary()) + + if len(self.columns) == 0: + lines.append('Empty {name}'.format(name=type(self).__name__)) + fmt.buffer_put_lines(buf, lines) + return + + cols = self.columns + + # hack + if max_cols is None: + max_cols = pd.get_option('display.max_info_columns', + len(self.columns) + 1) + + max_rows = pd.get_option('display.max_info_rows', len(self) + 1) + + if null_counts is None: + show_counts = ((len(self.columns) <= max_cols) and + (len(self) < max_rows)) + else: + show_counts = null_counts + exceeds_info_cols = len(self.columns) > max_cols + + # From pandas.DataFrame + def _put_str(s, space): + return '{s}'.format(s=s)[:space].ljust(space) + + def _verbose_repr(): + lines.append('Data columns (total %d columns):' % + len(self.columns)) + space = max(len(pprint_thing(k)) for k in self.columns) + 4 + counts = None + + tmpl = "{count}{dtype}" + if show_counts: + counts = self.count() + if len(cols) != len(counts): # pragma: no cover + raise AssertionError( + 'Columns must equal counts ' + '({cols:d} != {counts:d})'.format( + cols=len(cols), counts=len(counts))) + tmpl = "{count} non-null {dtype}" + + dtypes = self.dtypes + for i, col in enumerate(self.columns): + dtype = dtypes.iloc[i] + col = pprint_thing(col) + + count = "" + if show_counts: + count = counts.iloc[i] + + lines.append(_put_str(col, space) + tmpl.format(count=count, + dtype=dtype)) + + def _non_verbose_repr(): + lines.append(self.columns._summary(name='Columns')) + + def _sizeof_fmt(num, size_qualifier): + # returns size in human readable format + for x in ['bytes', 'KB', 'MB', 'GB', 'TB']: + if num < 1024.0: + return ("{num:3.1f}{size_q} " + "{x}".format(num=num, size_q=size_qualifier, x=x)) + num /= 1024.0 + return "{num:3.1f}{size_q} {pb}".format(num=num, + size_q=size_qualifier, + pb='PB') + + if verbose: + _verbose_repr() + elif verbose is False: # specifically set to False, not nesc None + _non_verbose_repr() + else: + if exceeds_info_cols: + _non_verbose_repr() + else: + _verbose_repr() + + counts = self.get_dtype_counts() + dtypes = ['{k}({kk:d})'.format(k=k[0], kk=k[1]) for k + in sorted(counts.items())] + lines.append('dtypes: {types}'.format(types=', '.join(dtypes))) + + if memory_usage is None: + memory_usage = pd.get_option('display.memory_usage') + if memory_usage: + # append memory usage of df to display + size_qualifier = '' + + # TODO - this is different from pd.DataFrame as we shouldn't + # really hold much in memory. For now just approximate with getsizeof + ignore deep + mem_usage = sys.getsizeof(self) + lines.append("memory usage: {mem}\n".format( + mem=_sizeof_fmt(mem_usage, size_qualifier))) + + fmt.buffer_put_lines(buf, lines) def to_string(self, buf=None, columns=None, col_space=None, header=True, @@ -153,7 +293,7 @@ class DataFrame(NDFrame): def _getitem_column(self, key): if key not in self.columns: - raise KeyError("{}".format(key)) + raise KeyError("Requested column is not in the DataFrame {}".format(key)) s = self._reduce_dimension(self._query_compiler.getitem_column_array([key])) s._parent = self return s @@ -196,6 +336,17 @@ class DataFrame(NDFrame): query_compiler=self._query_compiler.getitem_column_array(key) ) + def _create_or_update_from_compiler(self, new_query_compiler, inplace=False): + """Returns or updates a DataFrame given new query_compiler""" + assert ( + isinstance(new_query_compiler, type(self._query_compiler)) + or type(new_query_compiler) in self._query_compiler.__class__.__bases__ + ), "Invalid Query Compiler object: {}".format(type(new_query_compiler)) + if not inplace: + return DataFrame(query_compiler=new_query_compiler) + else: + self._query_compiler=new_query_compiler + def _reduce_dimension(self, query_compiler): return Series(query_compiler=query_compiler) diff --git a/eland/docs/dataframe_supported.rst b/eland/docs/dataframe_supported.rst index e239e32..44abfdc 100644 --- a/eland/docs/dataframe_supported.rst +++ b/eland/docs/dataframe_supported.rst @@ -12,6 +12,428 @@ the method in the left column. ``Y`` stands for yes, ``N`` stands for no, ``P`` for partial (meaning some parameters may not be supported yet), and ``D`` stands for default to pandas. +https://github.com/adgirish/kaggleScape/blob/master/results/annotResults.csv represents a prioritised list. + ++-------------------------+-------+------------------------------------------------+ +| Method | Count | Notes | ++-------------------------+-------+------------------------------------------------+ +| pd.read_csv | 1422 | Not implemented ed.read_es implemented instead | ++-------------------------+-------+------------------------------------------------+ +| pd.DataFrame | 886 | y | ++-------------------------+-------+------------------------------------------------+ +| df.append | 792 | Not implemented | ++-------------------------+-------+------------------------------------------------+ +| df.mean | 783 | y | ++-------------------------+-------+------------------------------------------------+ +| df.head | 783 | y | ++-------------------------+-------+------------------------------------------------+ +| df.drop | 761 | | ++-------------------------+-------+------------------------------------------------+ +| df.sum | 755 | y | ++-------------------------+-------+------------------------------------------------+ +| df.to_csv | 693 | | ++-------------------------+-------+------------------------------------------------+ +| df.get | 669 | | ++-------------------------+-------+------------------------------------------------+ +| df.mode | 653 | | ++-------------------------+-------+------------------------------------------------+ +| df.astype | 649 | | ++-------------------------+-------+------------------------------------------------+ +| df.sub | 637 | | ++-------------------------+-------+------------------------------------------------+ +| pd.concat | 582 | | ++-------------------------+-------+------------------------------------------------+ +| df.apply | 577 | | ++-------------------------+-------+------------------------------------------------+ +| df.groupby | 557 | | ++-------------------------+-------+------------------------------------------------+ +| df.join | 544 | | ++-------------------------+-------+------------------------------------------------+ +| df.fillna | 543 | | ++-------------------------+-------+------------------------------------------------+ +| df.max | 508 | | ++-------------------------+-------+------------------------------------------------+ +| df.reset_index | 434 | | ++-------------------------+-------+------------------------------------------------+ +| pd.unique | 433 | | ++-------------------------+-------+------------------------------------------------+ +| df.le | 405 | | ++-------------------------+-------+------------------------------------------------+ +| df.count | 399 | | ++-------------------------+-------+------------------------------------------------+ +| pd.value_counts | 397 | | ++-------------------------+-------+------------------------------------------------+ +| df.sort_values | 390 | | ++-------------------------+-------+------------------------------------------------+ +| df.transform | 387 | | ++-------------------------+-------+------------------------------------------------+ +| df.merge | 376 | | ++-------------------------+-------+------------------------------------------------+ +| df.add | 346 | | ++-------------------------+-------+------------------------------------------------+ +| df.isnull | 338 | | ++-------------------------+-------+------------------------------------------------+ +| df.min | 321 | | ++-------------------------+-------+------------------------------------------------+ +| df.copy | 314 | | ++-------------------------+-------+------------------------------------------------+ +| df.replace | 300 | | ++-------------------------+-------+------------------------------------------------+ +| df.std | 261 | | ++-------------------------+-------+------------------------------------------------+ +| df.hist | 246 | | ++-------------------------+-------+------------------------------------------------+ +| df.filter | 234 | | ++-------------------------+-------+------------------------------------------------+ +| df.describe | 220 | | ++-------------------------+-------+------------------------------------------------+ +| df.ne | 218 | | ++-------------------------+-------+------------------------------------------------+ +| df.corr | 217 | | ++-------------------------+-------+------------------------------------------------+ +| df.median | 217 | | ++-------------------------+-------+------------------------------------------------+ +| df.items | 212 | | ++-------------------------+-------+------------------------------------------------+ +| pd.to_datetime | 204 | | ++-------------------------+-------+------------------------------------------------+ +| df.isin | 203 | | ++-------------------------+-------+------------------------------------------------+ +| df.dropna | 195 | | ++-------------------------+-------+------------------------------------------------+ +| pd.get_dummies | 190 | | ++-------------------------+-------+------------------------------------------------+ +| df.rename | 185 | | ++-------------------------+-------+------------------------------------------------+ +| df.info | 180 | | ++-------------------------+-------+------------------------------------------------+ +| df.set_index | 166 | | ++-------------------------+-------+------------------------------------------------+ +| df.keys | 159 | | ++-------------------------+-------+------------------------------------------------+ +| df.sample | 155 | | ++-------------------------+-------+------------------------------------------------+ +| df.agg | 140 | | ++-------------------------+-------+------------------------------------------------+ +| df.where | 138 | | ++-------------------------+-------+------------------------------------------------+ +| df.boxplot | 134 | | ++-------------------------+-------+------------------------------------------------+ +| df.clip | 116 | | ++-------------------------+-------+------------------------------------------------+ +| df.round | 116 | | ++-------------------------+-------+------------------------------------------------+ +| df.abs | 101 | | ++-------------------------+-------+------------------------------------------------+ +| df.stack | 97 | | ++-------------------------+-------+------------------------------------------------+ +| df.tail | 94 | | ++-------------------------+-------+------------------------------------------------+ +| df.update | 92 | | ++-------------------------+-------+------------------------------------------------+ +| df.iterrows | 90 | | ++-------------------------+-------+------------------------------------------------+ +| df.transpose | 87 | | ++-------------------------+-------+------------------------------------------------+ +| df.any | 85 | | ++-------------------------+-------+------------------------------------------------+ +| df.pipe | 80 | | ++-------------------------+-------+------------------------------------------------+ +| pd.eval | 73 | | ++-------------------------+-------+------------------------------------------------+ +| df.eval | 73 | | ++-------------------------+-------+------------------------------------------------+ +| pd.read_json | 72 | | ++-------------------------+-------+------------------------------------------------+ +| df.nunique | 70 | | ++-------------------------+-------+------------------------------------------------+ +| df.pivot | 70 | | ++-------------------------+-------+------------------------------------------------+ +| df.select | 68 | | ++-------------------------+-------+------------------------------------------------+ +| df.as_matrix | 67 | | ++-------------------------+-------+------------------------------------------------+ +| df.notnull | 66 | | ++-------------------------+-------+------------------------------------------------+ +| df.cumsum | 66 | | ++-------------------------+-------+------------------------------------------------+ +| df.prod | 64 | | ++-------------------------+-------+------------------------------------------------+ +| df.unstack | 64 | | ++-------------------------+-------+------------------------------------------------+ +| df.drop_duplicates | 63 | | ++-------------------------+-------+------------------------------------------------+ +| df.div | 63 | | ++-------------------------+-------+------------------------------------------------+ +| pd.crosstab | 59 | | ++-------------------------+-------+------------------------------------------------+ +| df.select_dtypes | 57 | | ++-------------------------+-------+------------------------------------------------+ +| df.pow | 56 | | ++-------------------------+-------+------------------------------------------------+ +| df.sort_index | 56 | | ++-------------------------+-------+------------------------------------------------+ +| df.product | 52 | | ++-------------------------+-------+------------------------------------------------+ +| df.isna | 51 | | ++-------------------------+-------+------------------------------------------------+ +| df.dot | 46 | | ++-------------------------+-------+------------------------------------------------+ +| pd.cut | 45 | | ++-------------------------+-------+------------------------------------------------+ +| df.bool | 44 | | ++-------------------------+-------+------------------------------------------------+ +| df.to_dict | 44 | | ++-------------------------+-------+------------------------------------------------+ +| df.diff | 44 | | ++-------------------------+-------+------------------------------------------------+ +| df.insert | 44 | | ++-------------------------+-------+------------------------------------------------+ +| df.pop | 44 | | ++-------------------------+-------+------------------------------------------------+ +| df.query | 43 | | ++-------------------------+-------+------------------------------------------------+ +| df.var | 43 | | ++-------------------------+-------+------------------------------------------------+ +| df.__init__ | 41 | | ++-------------------------+-------+------------------------------------------------+ +| pd.to_numeric | 39 | | ++-------------------------+-------+------------------------------------------------+ +| df.squeeze | 39 | | ++-------------------------+-------+------------------------------------------------+ +| df.ge | 37 | | ++-------------------------+-------+------------------------------------------------+ +| df.quantile | 37 | | ++-------------------------+-------+------------------------------------------------+ +| df.reindex | 37 | | ++-------------------------+-------+------------------------------------------------+ +| df.rolling | 35 | | ++-------------------------+-------+------------------------------------------------+ +| pd.factorize | 32 | | ++-------------------------+-------+------------------------------------------------+ +| pd.melt | 31 | | ++-------------------------+-------+------------------------------------------------+ +| df.melt | 31 | | ++-------------------------+-------+------------------------------------------------+ +| df.rank | 31 | | ++-------------------------+-------+------------------------------------------------+ +| pd.read_table | 30 | | ++-------------------------+-------+------------------------------------------------+ +| pd.pivot_table | 30 | | ++-------------------------+-------+------------------------------------------------+ +| df.idxmax | 30 | | ++-------------------------+-------+------------------------------------------------+ +| pd.test | 29 | | ++-------------------------+-------+------------------------------------------------+ +| df.iteritems | 29 | | ++-------------------------+-------+------------------------------------------------+ +| df.shift | 28 | | ++-------------------------+-------+------------------------------------------------+ +| df.mul | 28 | | ++-------------------------+-------+------------------------------------------------+ +| pd.qcut | 25 | | ++-------------------------+-------+------------------------------------------------+ +| df.set_value | 25 | | ++-------------------------+-------+------------------------------------------------+ +| df.all | 24 | | ++-------------------------+-------+------------------------------------------------+ +| df.skew | 24 | | ++-------------------------+-------+------------------------------------------------+ +| df.aggregate | 23 | | ++-------------------------+-------+------------------------------------------------+ +| pd.match | 22 | | ++-------------------------+-------+------------------------------------------------+ +| df.nlargest | 22 | | ++-------------------------+-------+------------------------------------------------+ +| df.multiply | 21 | | ++-------------------------+-------+------------------------------------------------+ +| df.set_axis | 19 | | ++-------------------------+-------+------------------------------------------------+ +| df.eq | 18 | | ++-------------------------+-------+------------------------------------------------+ +| df.resample | 18 | | ++-------------------------+-------+------------------------------------------------+ +| pd.read_sql | 17 | | ++-------------------------+-------+------------------------------------------------+ +| df.duplicated | 16 | | ++-------------------------+-------+------------------------------------------------+ +| pd.date_range | 16 | | ++-------------------------+-------+------------------------------------------------+ +| df.interpolate | 15 | | ++-------------------------+-------+------------------------------------------------+ +| df.memory_usage | 15 | | ++-------------------------+-------+------------------------------------------------+ +| df.divide | 14 | | ++-------------------------+-------+------------------------------------------------+ +| df.cov | 13 | | ++-------------------------+-------+------------------------------------------------+ +| df.assign | 12 | | ++-------------------------+-------+------------------------------------------------+ +| df.subtract | 12 | | ++-------------------------+-------+------------------------------------------------+ +| pd.read_pickle | 11 | | ++-------------------------+-------+------------------------------------------------+ +| df.applymap | 11 | | ++-------------------------+-------+------------------------------------------------+ +| df.first | 11 | | ++-------------------------+-------+------------------------------------------------+ +| df.kurt | 10 | | ++-------------------------+-------+------------------------------------------------+ +| df.truncate | 10 | | ++-------------------------+-------+------------------------------------------------+ +| df.get_value | 9 | | ++-------------------------+-------+------------------------------------------------+ +| pd.read_hdf | 9 | | ++-------------------------+-------+------------------------------------------------+ +| df.to_html | 9 | | ++-------------------------+-------+------------------------------------------------+ +| pd.read_sql_query | 9 | | ++-------------------------+-------+------------------------------------------------+ +| df.take | 8 | | ++-------------------------+-------+------------------------------------------------+ +| df.to_pickle | 7 | | ++-------------------------+-------+------------------------------------------------+ +| df.itertuples | 7 | | ++-------------------------+-------+------------------------------------------------+ +| df.to_string | 7 | | ++-------------------------+-------+------------------------------------------------+ +| df.last | 7 | | ++-------------------------+-------+------------------------------------------------+ +| df.sem | 7 | | ++-------------------------+-------+------------------------------------------------+ +| pd.to_pickle | 7 | | ++-------------------------+-------+------------------------------------------------+ +| df.to_json | 7 | | ++-------------------------+-------+------------------------------------------------+ +| df.idxmin | 7 | | ++-------------------------+-------+------------------------------------------------+ +| df.xs | 6 | | ++-------------------------+-------+------------------------------------------------+ +| df.combine | 6 | | ++-------------------------+-------+------------------------------------------------+ +| pd.rolling_mean | 6 | | ++-------------------------+-------+------------------------------------------------+ +| df.to_period | 6 | | ++-------------------------+-------+------------------------------------------------+ +| df.convert_objects | 5 | | ++-------------------------+-------+------------------------------------------------+ +| df.mask | 4 | | ++-------------------------+-------+------------------------------------------------+ +| df.pct_change | 4 | | ++-------------------------+-------+------------------------------------------------+ +| df.add_prefix | 4 | | ++-------------------------+-------+------------------------------------------------+ +| pd.read_excel | 4 | | ++-------------------------+-------+------------------------------------------------+ +| pd.rolling_std | 3 | | ++-------------------------+-------+------------------------------------------------+ +| df.to_records | 3 | | ++-------------------------+-------+------------------------------------------------+ +| df.corrwith | 3 | | ++-------------------------+-------+------------------------------------------------+ +| df.swapaxes | 3 | | ++-------------------------+-------+------------------------------------------------+ +| df.__iter__ | 3 | | ++-------------------------+-------+------------------------------------------------+ +| df.to_sql | 3 | | ++-------------------------+-------+------------------------------------------------+ +| pd.read_feather | 3 | | ++-------------------------+-------+------------------------------------------------+ +| df.to_feather | 3 | | ++-------------------------+-------+------------------------------------------------+ +| df.__len__ | 3 | | ++-------------------------+-------+------------------------------------------------+ +| df.kurtosis | 3 | | ++-------------------------+-------+------------------------------------------------+ +| df.mod | 2 | | ++-------------------------+-------+------------------------------------------------+ +| df.to_sparse | 2 | | ++-------------------------+-------+------------------------------------------------+ +| df.get_values | 2 | | ++-------------------------+-------+------------------------------------------------+ +| df.__eq__ | 2 | | ++-------------------------+-------+------------------------------------------------+ +| pd.bdate_range | 2 | | ++-------------------------+-------+------------------------------------------------+ +| df.get_dtype_counts | 2 | | ++-------------------------+-------+------------------------------------------------+ +| df.combine_first | 2 | | ++-------------------------+-------+------------------------------------------------+ +| df._get_numeric_data | 2 | | ++-------------------------+-------+------------------------------------------------+ +| df.nsmallest | 2 | | ++-------------------------+-------+------------------------------------------------+ +| pd.scatter_matrix | 2 | | ++-------------------------+-------+------------------------------------------------+ +| df.rename_axis | 2 | | ++-------------------------+-------+------------------------------------------------+ +| df.__setstate__ | 2 | | ++-------------------------+-------+------------------------------------------------+ +| df.cumprod | 2 | | ++-------------------------+-------+------------------------------------------------+ +| df.__getstate__ | 2 | | ++-------------------------+-------+------------------------------------------------+ +| df.equals | 2 | | ++-------------------------+-------+------------------------------------------------+ +| df.__getitem__ | 2 | | ++-------------------------+-------+------------------------------------------------+ +| df.clip_upper | 2 | | ++-------------------------+-------+------------------------------------------------+ +| df.floordiv | 2 | | ++-------------------------+-------+------------------------------------------------+ +| df.to_excel | 2 | | ++-------------------------+-------+------------------------------------------------+ +| df.reindex_axis | 1 | | ++-------------------------+-------+------------------------------------------------+ +| pd.to_timedelta | 1 | | ++-------------------------+-------+------------------------------------------------+ +| df.ewm | 1 | | ++-------------------------+-------+------------------------------------------------+ +| df.tz_localize | 1 | | ++-------------------------+-------+------------------------------------------------+ +| df.tz_convert | 1 | | ++-------------------------+-------+------------------------------------------------+ +| df.to_hdf | 1 | | ++-------------------------+-------+------------------------------------------------+ +| df.lookup | 1 | | ++-------------------------+-------+------------------------------------------------+ +| pd.merge_ordered | 1 | | ++-------------------------+-------+------------------------------------------------+ +| df.swaplevel | 1 | | ++-------------------------+-------+------------------------------------------------+ +| df.first_valid_index | 1 | | ++-------------------------+-------+------------------------------------------------+ +| df.lt | 1 | | ++-------------------------+-------+------------------------------------------------+ +| df.add_suffix | 1 | | ++-------------------------+-------+------------------------------------------------+ +| pd.rolling_median | 1 | | ++-------------------------+-------+------------------------------------------------+ +| df.to_dense | 1 | | ++-------------------------+-------+------------------------------------------------+ +| df.mad | 1 | | ++-------------------------+-------+------------------------------------------------+ +| df.align | 1 | | ++-------------------------+-------+------------------------------------------------+ +| df.__copy__ | 1 | | ++-------------------------+-------+------------------------------------------------+ +| pd.set_eng_float_format | 1 | | ++-------------------------+-------+------------------------------------------------+ +| df.add_suffix | 1 | | ++-------------------------+-------+------------------------------------------------+ +| pd.rolling_median | 1 | | ++-------------------------+-------+------------------------------------------------+ +| df.to_dense | 1 | | ++-------------------------+-------+------------------------------------------------+ +| df.mad | 1 | | ++-------------------------+-------+------------------------------------------------+ +| df.align | 1 | | ++-------------------------+-------+------------------------------------------------+ +| df.__copy__ | 1 | | ++-------------------------+-------+------------------------------------------------+ +| pd.set_eng_float_format | 1 | | ++-------------------------+-------+------------------------------------------------+ + +---------------------------+---------------------------------+----------------------------------------------------+ | DataFrame method | Eland Implementation? (Y/N/P/D) | Notes for Current implementation | +---------------------------+---------------------------------+----------------------------------------------------+ diff --git a/eland/index.py b/eland/index.py index c175916..3b3b243 100644 --- a/eland/index.py +++ b/eland/index.py @@ -53,6 +53,7 @@ class Index: # Make iterable def __next__(self): # TODO resolve this hack to make this 'iterable' + print("In Index.__next__") raise StopIteration() def __iter__(self): diff --git a/eland/mappings.py b/eland/mappings.py index 79d60ef..6f17dfb 100644 --- a/eland/mappings.py +++ b/eland/mappings.py @@ -1,8 +1,9 @@ import warnings import pandas as pd +from pandas.core.dtypes.common import (is_float_dtype, is_bool_dtype, is_integer_dtype, is_datetime_or_timedelta_dtype, + is_string_dtype) -from pandas.core.dtypes.common import (is_float_dtype, is_bool_dtype, is_integer_dtype, is_datetime_or_timedelta_dtype, is_string_dtype) class Mappings: """ @@ -33,8 +34,7 @@ class Mappings: def __init__(self, client=None, index_pattern=None, - mappings=None, - columns=None): + mappings=None): """ Parameters ---------- @@ -48,9 +48,6 @@ class Mappings: mappings: Mappings Object to copy - - columns: list of str - Columns to copy """ if (client is not None) and (index_pattern is not None): get_mapping = client.get_mapping(index=index_pattern) @@ -203,11 +200,11 @@ class Mappings: if 'non_aggregatable_indices' in vv: warnings.warn("Field {} has conflicting aggregatable fields across indexes {}", - format(field_name, vv['non_aggregatable_indices']), + format(field, vv['non_aggregatable_indices']), UserWarning) if 'non_searchable_indices' in vv: warnings.warn("Field {} has conflicting searchable fields across indexes {}", - format(field_name, vv['non_searchable_indices']), + format(field, vv['non_searchable_indices']), UserWarning) capability_matrix_df = pd.DataFrame.from_dict(capability_matrix, orient='index', columns=columns) @@ -406,16 +403,23 @@ class Mappings: return is_source_field - def numeric_source_fields(self): + def numeric_source_fields(self, columns): """ Returns ------- numeric_source_fields: list of str - List of source fields where pd_dtype == (int64 or float64) + List of source fields where pd_dtype == (int64 or float64 or bool) """ - return self._mappings_capabilities[(self._mappings_capabilities._source == True) & - ((self._mappings_capabilities.pd_dtype == 'int64') | - (self._mappings_capabilities.pd_dtype == 'float64'))].index.tolist() + if columns is not None: + return self._mappings_capabilities[(self._mappings_capabilities._source == True) & + ((self._mappings_capabilities.pd_dtype == 'int64') | + (self._mappings_capabilities.pd_dtype == 'float64') | + (self._mappings_capabilities.pd_dtype == 'bool'))].loc[columns].index.tolist() + else: + return self._mappings_capabilities[(self._mappings_capabilities._source == True) & + ((self._mappings_capabilities.pd_dtype == 'int64') | + (self._mappings_capabilities.pd_dtype == 'float64') | + (self._mappings_capabilities.pd_dtype == 'bool'))].index.tolist() def source_fields(self): """ @@ -435,16 +439,20 @@ class Mappings: """ return len(self.source_fields()) - def dtypes(self): + def dtypes(self, columns=None): """ Returns ------- dtypes: pd.Series Source field name + pd_dtype """ + if columns is not None: + return pd.Series( + {key: self._source_field_pd_dtypes[key] for key in columns}) + return pd.Series(self._source_field_pd_dtypes) - def get_dtype_counts(self): + def get_dtype_counts(self, columns=None): """ Return counts of unique dtypes in this object. @@ -453,10 +461,17 @@ class Mappings: get_dtype_counts : Series Series with the count of columns with each dtype. """ - return pd.Series(self._mappings_capabilities[self._mappings_capabilities._source == True].groupby('pd_dtype')[ - '_source'].count().to_dict()) + + if columns is not None: + return pd.Series(self._mappings_capabilities[self._mappings_capabilities._source == True] + .loc[columns] + .groupby('pd_dtype')['_source'] + .count().to_dict()) + + return pd.Series(self._mappings_capabilities[self._mappings_capabilities._source == True] + .groupby('pd_dtype')['_source'] + .count().to_dict()) def info_es(self, buf): buf.write("Mappings:\n") buf.write("\tcapabilities: {0}\n".format(self._mappings_capabilities)) - diff --git a/eland/ndframe.py b/eland/ndframe.py index 30b2897..a72aebf 100644 --- a/eland/ndframe.py +++ b/eland/ndframe.py @@ -23,8 +23,13 @@ only Elasticsearch aggregatable fields can be aggregated or grouped. """ +import sys + +import pandas as pd from modin.pandas.base import BasePandasDataset from modin.pandas.indexing import _iLocIndexer +from pandas.util._validators import validate_bool_kwarg +from pandas.core.dtypes.common import is_list_like from eland import ElandQueryCompiler @@ -52,12 +57,18 @@ class NDFrame(BasePandasDataset): index_field=index_field) self._query_compiler = query_compiler - def _get_index(self): return self._query_compiler.index index = property(_get_index) + @property + def dtypes(self): + return self._query_compiler.dtypes + + def get_dtype_counts(self): + return self._query_compiler.get_dtype_counts() + def _build_repr_df(self, num_rows, num_cols): # Overriden version of BasePandasDataset._build_repr_df # to avoid issues with concat @@ -91,6 +102,10 @@ class NDFrame(BasePandasDataset): return self[key] raise e + def __sizeof__(self): + # Don't default to pandas, just return approximation TODO - make this more accurate + return sys.getsizeof(self._query_compiler) + @property def iloc(self): """Purely integer-location based indexing for selection by position. @@ -101,3 +116,117 @@ class NDFrame(BasePandasDataset): def info_es(self, buf): self._query_compiler.info_es(buf) + def drop( + self, + labels=None, + axis=0, + index=None, + columns=None, + level=None, + inplace=False, + errors="raise", + ): + """Return new object with labels in requested axis removed. + Args: + labels: Index or column labels to drop. + axis: Whether to drop labels from the index (0 / 'index') or + columns (1 / 'columns'). + index, columns: Alternative to specifying axis (labels, axis=1 is + equivalent to columns=labels). + level: For MultiIndex + inplace: If True, do operation inplace and return None. + errors: If 'ignore', suppress error and existing labels are + dropped. + Returns: + dropped : type of caller + + (derived from modin.base.BasePandasDataset) + """ + # Level not supported + if level is not None: + raise NotImplementedError("level not supported {}".format(level)) + + inplace = validate_bool_kwarg(inplace, "inplace") + if labels is not None: + if index is not None or columns is not None: + raise ValueError("Cannot specify both 'labels' and 'index'/'columns'") + axis = pd.DataFrame()._get_axis_name(axis) + axes = {axis: labels} + elif index is not None or columns is not None: + axes, _ = pd.DataFrame()._construct_axes_from_arguments( + (index, columns), {} + ) + else: + raise ValueError( + "Need to specify at least one of 'labels', 'index' or 'columns'" + ) + + # TODO Clean up this error checking + if "index" not in axes: + axes["index"] = None + elif axes["index"] is not None: + if not is_list_like(axes["index"]): + axes["index"] = [axes["index"]] + if errors == "raise": + # Check if axes['index'] values exists in index + count = self._query_compiler._index_matches_count(axes["index"]) + if count != len(axes["index"]): + raise ValueError( + "number of labels {}!={} not contained in axis".format(count, len(axes["index"])) + ) + else: + """ + axes["index"] = self._query_compiler.index_matches(axes["index"]) + # If the length is zero, we will just do nothing + if not len(axes["index"]): + axes["index"] = None + """ + raise NotImplementedError() + + if "columns" not in axes: + axes["columns"] = None + elif axes["columns"] is not None: + if not is_list_like(axes["columns"]): + axes["columns"] = [axes["columns"]] + if errors == "raise": + non_existant = [ + obj for obj in axes["columns"] if obj not in self.columns + ] + if len(non_existant): + raise ValueError( + "labels {} not contained in axis".format(non_existant) + ) + else: + axes["columns"] = [ + obj for obj in axes["columns"] if obj in self.columns + ] + # If the length is zero, we will just do nothing + if not len(axes["columns"]): + axes["columns"] = None + + new_query_compiler = self._query_compiler.drop( + index=axes["index"], columns=axes["columns"] + ) + return self._create_or_update_from_compiler(new_query_compiler, inplace) + + # TODO implement arguments + def mean(self): + return self._query_compiler.mean() + + def sum(self, numeric_only=True): + if numeric_only == False: + raise NotImplementedError("Only sum of numeric fields is implemented") + return self._query_compiler.sum() + + def min(self, numeric_only=True): + if numeric_only == False: + raise NotImplementedError("Only sum of numeric fields is implemented") + return self._query_compiler.min() + + def max(self, numeric_only=True): + if numeric_only == False: + raise NotImplementedError("Only sum of numeric fields is implemented") + return self._query_compiler.max() + + def describe(self): + return self._query_compiler.describe() diff --git a/eland/operations.py b/eland/operations.py index 7c6c863..b5fbdb3 100644 --- a/eland/operations.py +++ b/eland/operations.py @@ -1,9 +1,11 @@ +import copy from enum import Enum -from pandas.core.indexes.numeric import Int64Index -from pandas.core.indexes.range import RangeIndex +import pandas as pd +from elasticsearch_dsl import Search -import copy +from eland import Index +from eland import Query class Operations: @@ -16,17 +18,6 @@ class Operations: - a query to filter the results (e.g. df.A > 10) This is maintained as a 'task graph' (inspired by dask) - - A task graph is a dictionary mapping keys to computations: - - A key is any hashable value that is not a task: - ``` - {'x': 1, - 'y': 2, - 'z': (add, 'x', 'y'), - 'w': (sum, ['x', 'y', 'z']), - 'v': [(sum, ['w', 'z']), 2]} - ``` (see https://docs.dask.org/en/latest/spec.html) """ @@ -54,7 +45,6 @@ class Operations: return Operations.SortOrder.DESC - def __init__(self, tasks=None): if tasks == None: self._tasks = [] @@ -84,6 +74,11 @@ class Operations: # TODO - validate we are setting columns to a subset of last columns? task = ('columns', columns) self._tasks.append(task) + # Iterate backwards through task list looking for last 'columns' task + for task in reversed(self._tasks): + if task[0] == 'columns': + return task[1] + return None def get_columns(self): # Iterate backwards through task list looking for last 'columns' task @@ -95,10 +90,132 @@ class Operations: def __repr__(self): return repr(self._tasks) - def to_pandas(self, query_compiler): - query, post_processing = self._to_es_query() + def count(self, query_compiler): + query_params, post_processing = self._resolve_tasks() - size, sort_params = Operations._query_to_params(query) + # Elasticsearch _count is very efficient and so used to return results here. This means that + # data frames that have restricted size or sort params will not return valid results (_count doesn't support size). + # Longer term we may fall back to pandas, but this may result in loading all index into memory. + if self._size(query_params, post_processing) is not None: + raise NotImplementedError("Requesting count with additional query and processing parameters " + "not supported {0} {1}" + .format(query_params, post_processing)) + + # Only return requested columns + fields = query_compiler.columns + + counts = {} + for field in fields: + body = Query(query_params['query']) + body.exists(field, must=True) + + field_exists_count = query_compiler._client.count(index=query_compiler._index_pattern, + body=body.to_count_body()) + counts[field] = field_exists_count + + return pd.Series(data=counts, index=fields) + + def mean(self, query_compiler): + return self._metric_aggs(query_compiler, 'avg') + + def sum(self, query_compiler): + return self._metric_aggs(query_compiler, 'sum') + + def max(self, query_compiler): + return self._metric_aggs(query_compiler, 'max') + + def min(self, query_compiler): + return self._metric_aggs(query_compiler, 'min') + + def _metric_aggs(self, query_compiler, func): + query_params, post_processing = self._resolve_tasks() + + size = self._size(query_params, post_processing) + if size is not None: + raise NotImplementedError("Can not count field matches if size is set {}".format(size)) + + columns = self.get_columns() + + numeric_source_fields = query_compiler._mappings.numeric_source_fields(columns) + + body = Query(query_params['query']) + + for field in numeric_source_fields: + body.metric_aggs(field, func, field) + + response = query_compiler._client.search( + index=query_compiler._index_pattern, + size=0, + body=body.to_search_body()) + + # Results are of the form + # "aggregations" : { + # "AvgTicketPrice" : { + # "value" : 628.2536888148849 + # } + # } + results = {} + + for field in numeric_source_fields: + results[field] = response['aggregations'][field]['value'] + + s = pd.Series(data=results, index=numeric_source_fields) + + return s + + def describe(self, query_compiler): + query_params, post_processing = self._resolve_tasks() + + size = self._size(query_params, post_processing) + if size is not None: + raise NotImplementedError("Can not count field matches if size is set {}".format(size)) + + columns = self.get_columns() + + numeric_source_fields = query_compiler._mappings.numeric_source_fields(columns) + + # for each field we compute: + # count, mean, std, min, 25%, 50%, 75%, max + body = Query(query_params['query']) + + for field in numeric_source_fields: + body.metric_aggs('extended_stats_' + field, 'extended_stats', field) + body.metric_aggs('percentiles_' + field, 'percentiles', field) + + print(body.to_search_body()) + + response = query_compiler._client.search( + index=query_compiler._index_pattern, + size=0, + body=body.to_search_body()) + + results = {} + + for field in numeric_source_fields: + values = list() + values.append(response['aggregations']['extended_stats_' + field]['count']) + values.append(response['aggregations']['extended_stats_' + field]['avg']) + values.append(response['aggregations']['extended_stats_' + field]['std_deviation']) + values.append(response['aggregations']['extended_stats_' + field]['min']) + values.append(response['aggregations']['percentiles_' + field]['values']['25.0']) + values.append(response['aggregations']['percentiles_' + field]['values']['50.0']) + values.append(response['aggregations']['percentiles_' + field]['values']['75.0']) + values.append(response['aggregations']['extended_stats_' + field]['max']) + + # if not None + if values.count(None) < len(values): + results[field] = values + + df = pd.DataFrame(data=results, index=['count', 'mean', 'std', 'min', '25%', '50%', '75%', 'max']) + + return df + + def to_pandas(self, query_compiler): + query_params, post_processing = self._resolve_tasks() + + size, sort_params = Operations._query_params_to_size_and_sort(query_params) + + body = Query(query_params['query']) # Only return requested columns columns = self.get_columns() @@ -110,10 +227,12 @@ class Operations: index=query_compiler._index_pattern, size=size, sort=sort_params, + body=body.to_search_body(), _source=columns) else: es_results = query_compiler._client.scan( index=query_compiler._index_pattern, + query=body.to_search_body(), _source=columns) # create post sort if sort_params is not None: @@ -132,25 +251,65 @@ class Operations: task = ('squeeze', axis) self._tasks.append(task) - def to_count(self, query_compiler): - query, post_processing = self._to_es_query() + def index_count(self, query_compiler, field): + # field is the index field so count values + query_params, post_processing = self._resolve_tasks() - size = query['query_size'] # can be None - - pp_size = self._count_post_processing(post_processing) - if pp_size is not None: - if size is not None: - size = min(size, pp_size) - else: - size = pp_size + size = self._size(query_params, post_processing) # Size is dictated by operations if size is not None: + # TODO - this is not necessarily valid as the field may not exist in ALL these docs return size - exists_query = {"query": {"exists": {"field": query_compiler.index.index_field}}} + body = Query(query_params['query']) + body.exists(field, must=True) - return query_compiler._client.count(index=query_compiler._index_pattern, body=exists_query) + return query_compiler._client.count(index=query_compiler._index_pattern, body=body.to_count_body()) + + def _validate_index_operation(self, items): + if not isinstance(items, list): + raise TypeError("list item required - not {}".format(type(items))) + + # field is the index field so count values + query_params, post_processing = self._resolve_tasks() + + size = self._size(query_params, post_processing) + + # Size is dictated by operations + if size is not None: + raise NotImplementedError("Can not count field matches if size is set {}".format(size)) + + return query_params, post_processing + + def index_matches_count(self, query_compiler, field, items): + query_params, post_processing = self._validate_index_operation(items) + + body = Query(query_params['query']) + + if field == Index.ID_INDEX_FIELD: + body.ids(items, must=True) + else: + body.terms(items, must=True) + + return query_compiler._client.count(index=query_compiler._index_pattern, body=body.to_count_body()) + + def drop_index_values(self, query_compiler, field, items): + self._validate_index_operation(items) + + # Putting boolean queries together + # i = 10 + # not i = 20 + # _id in [1,2,3] + # _id not in [1,2,3] + # a in ['a','b','c'] + # b not in ['a','b','c'] + # For now use term queries + if field == Index.ID_INDEX_FIELD: + task = ('query_ids', ('must_not', items)) + else: + task = ('query_terms', ('must_not', (field, items))) + self._tasks.append(task) @staticmethod def _sort_params_to_postprocessing(input): @@ -165,18 +324,21 @@ class Operations: return task @staticmethod - def _query_to_params(query): + def _query_params_to_size_and_sort(query_params): sort_params = None - if query['query_sort_field'] and query['query_sort_order']: - sort_params = query['query_sort_field'] + ":" + Operations.SortOrder.to_string(query['query_sort_order']) + if query_params['query_sort_field'] and query_params['query_sort_order']: + sort_params = query_params['query_sort_field'] + ":" + Operations.SortOrder.to_string( + query_params['query_sort_order']) - size = query['query_size'] + size = query_params['query_size'] return size, sort_params + 1 + @staticmethod def _count_post_processing(post_processing): - size = None + size = None for action in post_processing: if action[0] == 'head' or action[0] == 'tail': if size is None or action[1][1] < size: @@ -201,45 +363,48 @@ class Operations: else: df = df.sort_values(sort_field, False) elif action[0] == 'iloc': - index_indexer = action[1][0] - column_indexer = action[1][1] - if index_indexer is None: - index_indexer = slice(None) - if column_indexer is None: - column_indexer = slice(None) - df = df.iloc[index_indexer, column_indexer] + index_indexer = action[1][0] + column_indexer = action[1][1] + if index_indexer is None: + index_indexer = slice(None) + if column_indexer is None: + column_indexer = slice(None) + df = df.iloc[index_indexer, column_indexer] elif action[0] == 'squeeze': - print(df) df = df.squeeze(axis=action[1]) - print(df) return df - def _to_es_query(self): + def _resolve_tasks(self): # We now try and combine all tasks into an Elasticsearch query # Some operations can be simply combined into a single query # other operations require pre-queries and then combinations # other operations require in-core post-processing of results - query = {"query_sort_field": None, - "query_sort_order": None, - "query_size": None, - "query_fields": None} + query_params = {"query_sort_field": None, + "query_sort_order": None, + "query_size": None, + "query_fields": None, + "query": Query()} post_processing = [] for task in self._tasks: if task[0] == 'head': - query, post_processing = self._resolve_head(task, query, post_processing) + query_params, post_processing = self._resolve_head(task, query_params, post_processing) elif task[0] == 'tail': - query, post_processing = self._resolve_tail(task, query, post_processing) + query_params, post_processing = self._resolve_tail(task, query_params, post_processing) elif task[0] == 'iloc': - query, post_processing = self._resolve_iloc(task, query, post_processing) - else: # a lot of operations simply post-process the dataframe - put these straight through - query, post_processing = self._resolve_post_processing_task(task, query, post_processing) + query_params, post_processing = self._resolve_iloc(task, query_params, post_processing) + elif task[0] == 'query_ids': + query_params, post_processing = self._resolve_query_ids(task, query_params, post_processing) + elif task[0] == 'query_terms': + query_params, post_processing = self._resolve_query_terms(task, query_params, post_processing) + else: # a lot of operations simply post-process the dataframe - put these straight through + query_params, post_processing = self._resolve_post_processing_task(task, query_params, post_processing) - return query, post_processing + return query_params, post_processing - def _resolve_head(self, item, query, post_processing): + def _resolve_head(self, item, query_params, post_processing): # head - sort asc, size n # |12345-------------| query_sort_field = item[1][0] @@ -251,26 +416,26 @@ class Operations: # overwriting previous head) if len(post_processing) > 0: post_processing.append(item) - return query, post_processing + return query_params, post_processing - if query['query_sort_field'] is None: - query['query_sort_field'] = query_sort_field + if query_params['query_sort_field'] is None: + query_params['query_sort_field'] = query_sort_field # if it is already sorted we use existing field - if query['query_sort_order'] is None: - query['query_sort_order'] = query_sort_order + if query_params['query_sort_order'] is None: + query_params['query_sort_order'] = query_sort_order # if it is already sorted we get head of existing order - if query['query_size'] is None: - query['query_size'] = query_size + if query_params['query_size'] is None: + query_params['query_size'] = query_size else: # truncate if head is smaller - if query_size < query['query_size']: - query['query_size'] = query_size + if query_size < query_params['query_size']: + query_params['query_size'] = query_size - return query, post_processing + return query_params, post_processing - def _resolve_tail(self, item, query, post_processing): + def _resolve_tail(self, item, query_params, post_processing): # tail - sort desc, size n, post-process sort asc # |-------------12345| query_sort_field = item[1][0] @@ -278,41 +443,41 @@ class Operations: query_size = item[1][1] # If this is a tail of a tail adjust settings and return - if query['query_size'] is not None and \ - query['query_sort_order'] == query_sort_order and \ + if query_params['query_size'] is not None and \ + query_params['query_sort_order'] == query_sort_order and \ post_processing == [('sort_index')]: - if query_size < query['query_size']: - query['query_size'] = query_size - return query, post_processing + if query_size < query_params['query_size']: + query_params['query_size'] = query_size + return query_params, post_processing # If we are already postprocessing the query results, just get 'tail' of these # (note, currently we just append another tail, we don't optimise by # overwriting previous tail) if len(post_processing) > 0: post_processing.append(item) - return query, post_processing + return query_params, post_processing # If results are already constrained, just get 'tail' of these # (note, currently we just append another tail, we don't optimise by # overwriting previous tail) - if query['query_size'] is not None: + if query_params['query_size'] is not None: post_processing.append(item) - return query, post_processing + return query_params, post_processing else: - query['query_size'] = query_size - if query['query_sort_field'] is None: - query['query_sort_field'] = query_sort_field - if query['query_sort_order'] is None: - query['query_sort_order'] = query_sort_order + query_params['query_size'] = query_size + if query_params['query_sort_field'] is None: + query_params['query_sort_field'] = query_sort_field + if query_params['query_sort_order'] is None: + query_params['query_sort_order'] = query_sort_order else: # reverse sort order - query['query_sort_order'] = Operations.SortOrder.reverse(query_sort_order) + query_params['query_sort_order'] = Operations.SortOrder.reverse(query_sort_order) post_processing.append(('sort_index')) - return query, post_processing + return query_params, post_processing - def _resolve_iloc(self, item, query, post_processing): + def _resolve_iloc(self, item, query_params, post_processing): # tail - sort desc, size n, post-process sort asc # |---4--7-9---------| @@ -322,33 +487,70 @@ class Operations: last_item = int_index.max() # If we have a query_size we do this post processing - if query['query_size'] is not None: + if query_params['query_size'] is not None: post_processing.append(item) - return query, post_processing + return query_params, post_processing # size should be > last item - query['query_size'] = last_item + 1 + query_params['query_size'] = last_item + 1 post_processing.append(item) - return query, post_processing + return query_params, post_processing - def _resolve_post_processing_task(self, item, query, post_processing): + def _resolve_query_ids(self, item, query_params, post_processing): + # task = ('query_ids', ('must_not', items)) + must_clause = item[1][0] + ids = item[1][1] + + if must_clause == 'must': + query_params['query'].ids(ids, must=True) + else: + query_params['query'].ids(ids, must=False) + + return query_params, post_processing + + def _resolve_query_terms(self, item, query_params, post_processing): + # task = ('query_terms', ('must_not', (field, terms))) + must_clause = item[1][0] + field = item[1][1][0] + terms = item[1][1][1] + + if must_clause == 'must': + query_params['query'].terms(field, terms, must=True) + else: + query_params['query'].terms(field, terms, must=False) + + return query_params, post_processing + + def _resolve_post_processing_task(self, item, query_params, post_processing): # Just do this in post-processing post_processing.append(item) - return query, post_processing + return query_params, post_processing + + def _size(self, query_params, post_processing): + # Shrink wrap code around checking if size parameter is set + size = query_params['query_size'] # can be None + + pp_size = self._count_post_processing(post_processing) + if pp_size is not None: + if size is not None: + size = min(size, pp_size) + else: + size = pp_size + + # This can return None + return size def info_es(self, buf): buf.write("Operations:\n") buf.write("\ttasks: {0}\n".format(self._tasks)) - query, post_processing = self._to_es_query() - size, sort_params = Operations._query_to_params(query) + query_params, post_processing = self._resolve_tasks() + size, sort_params = Operations._query_params_to_size_and_sort(query_params) columns = self.get_columns() buf.write("\tsize: {0}\n".format(size)) buf.write("\tsort_params: {0}\n".format(sort_params)) buf.write("\tcolumns: {0}\n".format(columns)) buf.write("\tpost_processing: {0}\n".format(post_processing)) - - diff --git a/eland/query.py b/eland/query.py new file mode 100644 index 0000000..69b6e59 --- /dev/null +++ b/eland/query.py @@ -0,0 +1,93 @@ +import warnings +from copy import deepcopy + + +class Query: + """ + Simple class to manage building Elasticsearch queries. + + Specifically, this + + """ + + def __init__(self, query=None): + if query is None: + self._query = self._query_template() + self._aggs = {} + else: + # Deep copy the incoming query so we can change it + self._query = deepcopy(query._query) + self._aggs = deepcopy(query._aggs) + + def exists(self, field, must=True): + """ + Add exists query + https://www.elastic.co/guide/en/elasticsearch/reference/current/query-dsl-exists-query.html + """ + if must: + self._query['bool']['must'].append({'exists': {'field': field}}) + else: + self._query['bool']['must_not'].append({'exists': {'field': field}}) + + def ids(self, items, must=True): + """ + Add ids query + https://www.elastic.co/guide/en/elasticsearch/reference/current/query-dsl-ids-query.html + """ + if must: + self._query['bool']['must'].append({'ids': {'values': items}}) + else: + self._query['bool']['must_not'].append({'ids': {'values': items}}) + + def terms(self, field, items, must=True): + """ + Add ids query + https://www.elastic.co/guide/en/elasticsearch/reference/current/query-dsl-terms-query.html + """ + if must: + self._query['bool']['must'].append({'terms': {field: items}}) + else: + self._query['bool']['must_not'].append({'terms': {field: items}}) + + def metric_aggs(self, name, func, field): + """ + Add metric agg e.g + + "aggs": { + "name": { + "max": { + "field": "AvgTicketPrice" + } + } + } + """ + agg = { + func: { + "field": field + } + } + self._aggs[name] = agg + + def to_search_body(self): + body = {"query": self._query, "aggs": self._aggs} + return body + + def to_count_body(self): + if len(self._aggs) > 0: + warnings.warn('Requesting count for agg query {}', self) + body = {"query": self._query} + + return body + + def __repr__(self): + return repr(self.to_search_body()) + + @staticmethod + def _query_template(): + template = { + "bool": { + "must": [], + "must_not": [] + } + } + return deepcopy(template) diff --git a/eland/query_compiler.py b/eland/query_compiler.py index f96e26c..2e45c19 100644 --- a/eland/query_compiler.py +++ b/eland/query_compiler.py @@ -52,6 +52,17 @@ class ElandQueryCompiler(BaseQueryCompiler): columns = property(_get_columns, _set_columns) index = property(_get_index) + @property + def dtypes(self): + columns = self._operations.get_columns() + + return self._mappings.dtypes(columns) + + def get_dtype_counts(self): + columns = self._operations.get_columns() + + return self._mappings.get_dtype_counts(columns) + # END Index, columns, and dtypes objects def _es_results_to_pandas(self, results): @@ -226,7 +237,25 @@ class ElandQueryCompiler(BaseQueryCompiler): index_count: int Count of docs where index_field exists """ - return self._operations.to_count(self) + return self._operations.index_count(self, self.index.index_field) + + def _index_matches_count(self, items): + """ + Returns + ------- + index_count: int + Count of docs where items exist + """ + return self._operations.index_matches_count(self, self.index.index_field, items) + + def _index_matches(self, items): + """ + Returns + ------- + index_count: int + Count of list of the items that match + """ + return self._operations.index_matches(self, self.index.index_field, items) def copy(self): return self.__constructor__( @@ -295,6 +324,31 @@ class ElandQueryCompiler(BaseQueryCompiler): return result + def drop(self, index=None, columns=None): + result = self.copy() + + # Drop gets all columns and removes drops + if columns is not None: + # columns is a pandas.Index so we can use pandas drop feature + new_columns = self.columns.drop(columns) + result._operations.set_columns(new_columns.to_list()) + + if index is not None: + result._operations.drop_index_values(self, self.index.index_field, index) + + return result + + def count(self): + return self._operations.count(self) + def mean(self): + return self._operations.mean(self) + def sum(self): + return self._operations.sum(self) + def min(self): + return self._operations.min(self) + def max(self): + return self._operations.max(self) + def info_es(self, buf): buf.write("index_pattern: {index_pattern}\n".format(index_pattern=self._index_pattern)) @@ -302,3 +356,5 @@ class ElandQueryCompiler(BaseQueryCompiler): self._mappings.info_es(buf) self._operations.info_es(buf) + def describe(self): + return self._operations.describe(self) diff --git a/eland/tests/common.py b/eland/tests/common.py index a1fe68a..03123e6 100644 --- a/eland/tests/common.py +++ b/eland/tests/common.py @@ -56,6 +56,18 @@ def assert_pandas_eland_frame_equal(left, right): # Use pandas tests to check similarity assert_frame_equal(left, right._to_pandas()) +def assert_eland_frame_equal(left, right): + if not isinstance(left, ed.DataFrame): + raise AssertionError("Expected type {exp_type}, found {act_type} instead".format( + exp_type='ed.DataFrame', act_type=type(left))) + + if not isinstance(right, ed.DataFrame): + raise AssertionError("Expected type {exp_type}, found {act_type} instead".format( + exp_type='ed.DataFrame', act_type=type(right))) + + # Use pandas tests to check similarity + assert_frame_equal(left._to_pandas(), right._to_pandas()) + def assert_pandas_eland_series_equal(left, right): if not isinstance(left, pd.Series): diff --git a/eland/tests/dataframe/test_count_pytest.py b/eland/tests/dataframe/test_count_pytest.py index 4d3af5f..1768895 100644 --- a/eland/tests/dataframe/test_count_pytest.py +++ b/eland/tests/dataframe/test_count_pytest.py @@ -5,9 +5,15 @@ from eland.tests.common import TestData class TestDataFrameCount(TestData): - def test_to_string1(self): - ed_flights = self.ed_flights() - pd_flights = self.pd_flights() + def test_to_count1(self): + pd_ecommerce = self.pd_ecommerce() + ed_ecommerce = self.ed_ecommerce() + + pd_count = pd_ecommerce.count() + ed_count = ed_ecommerce.count() + + print(pd_count) + print(ed_count) + - #ed_count = ed_flights.count() diff --git a/eland/tests/dataframe/test_describe_pytest.py b/eland/tests/dataframe/test_describe_pytest.py new file mode 100644 index 0000000..801fad9 --- /dev/null +++ b/eland/tests/dataframe/test_describe_pytest.py @@ -0,0 +1,40 @@ +# File called _pytest for PyCharm compatability +from io import StringIO + +from eland.tests.common import TestData + + +class TestDataFrameInfo(TestData): + + def test_to_describe1(self): + pd_flights = self.pd_flights() + ed_flights = self.ed_flights() + + pd_describe = pd_flights.describe() + ed_describe = ed_flights.describe() + + # TODO - this fails now as ES aggregations are approximate + # if ES percentile agg uses + # "hdr": { + # "number_of_significant_value_digits": 3 + # } + # this works + # assert_almost_equal(pd_flights_describe, ed_flights_describe) + + pd_ecommerce_describe = self.pd_ecommerce().describe() + ed_ecommerce_describe = self.ed_ecommerce().describe() + + # We don't compare ecommerce here as the default dtypes in pandas from read_json + # don't match the mapping types. This is mainly because the products field is + # nested and so can be treated as a multi-field in ES, but not in pandas + + def test_to_describe2(self): + pd_flights = self.pd_flights().head() + ed_flights = self.ed_flights().head() + + pd_describe = pd_flights.describe() + ed_describe = ed_flights.describe() + + print(pd_describe) + print(ed_describe) + diff --git a/eland/tests/dataframe/test_drop_pytest.py b/eland/tests/dataframe/test_drop_pytest.py new file mode 100644 index 0000000..e410851 --- /dev/null +++ b/eland/tests/dataframe/test_drop_pytest.py @@ -0,0 +1,56 @@ +# File called _pytest for PyCharm compatability +import pandas as pd +import eland as ed + +from eland.tests.common import TestData +from eland.tests.common import ( + assert_eland_frame_equal, + assert_pandas_eland_frame_equal, + assert_pandas_eland_series_equal +) + +import numpy as np + +class TestDataFrameDrop(TestData): + + def test_drop1(self): + ed_flights = self.ed_flights() + pd_flights = self.pd_flights() + + # ['AvgTicketPrice', 'Cancelled', 'Carrier', 'Dest', 'DestAirportID', + # 'DestCityName', 'DestCountry', 'DestLocation', 'DestRegion', + # 'DestWeather', 'DistanceKilometers', 'DistanceMiles', 'FlightDelay', + # 'FlightDelayMin', 'FlightDelayType', 'FlightNum', 'FlightTimeHour', + # 'FlightTimeMin', 'Origin', 'OriginAirportID', 'OriginCityName', + # 'OriginCountry', 'OriginLocation', 'OriginRegion', 'OriginWeather', + # 'dayOfWeek', 'timestamp'] + pd_col0 = pd_flights.drop(['Carrier', 'DestCityName'], axis=1) + pd_col1 = pd_flights.drop(columns=['Carrier', 'DestCityName']) + + ed_col0 = ed_flights.drop(['Carrier', 'DestCityName'], axis=1) + ed_col1 = ed_flights.drop(columns=['Carrier', 'DestCityName']) + + #assert_pandas_eland_frame_equal(pd_col0, ed_col0) + #assert_pandas_eland_frame_equal(pd_col1, ed_col1) + + # Drop rows by index + pd_idx0 = pd_flights.drop(['1', '2']) + ed_idx0 = ed_flights.drop(['1', '2']) + + print(pd_idx0.info()) + print(ed_idx0.info()) + + assert_pandas_eland_frame_equal(pd_idx0, ed_idx0) + + """ + #assert_pandas_eland_frame_equal(pd_iloc0, ed_iloc0) # pd_iloc0 is Series + assert_pandas_eland_frame_equal(pd_iloc1, ed_iloc1) + assert_pandas_eland_frame_equal(pd_iloc2, ed_iloc2) + assert_pandas_eland_frame_equal(pd_iloc3, ed_iloc3) + assert_pandas_eland_frame_equal(pd_iloc4, ed_iloc4) + #assert_pandas_eland_frame_equal(pd_iloc5, ed_iloc5) # pd_iloc5 is numpy_bool + assert_pandas_eland_frame_equal(pd_iloc6, ed_iloc6) + assert_pandas_eland_frame_equal(pd_iloc7, ed_iloc7) + assert_pandas_eland_frame_equal(pd_iloc8, ed_iloc8) + assert_pandas_eland_frame_equal(pd_iloc9, ed_iloc9) + """ diff --git a/eland/tests/dataframe/test_getitem_pytest.py b/eland/tests/dataframe/test_getitem_pytest.py index 16c8d75..825c5a1 100644 --- a/eland/tests/dataframe/test_getitem_pytest.py +++ b/eland/tests/dataframe/test_getitem_pytest.py @@ -37,3 +37,19 @@ class TestDataFrameGetItem(TestData): pd_flights_OriginAirportID = pd_flights.OriginAirportID assert_pandas_eland_series_equal(pd_flights_OriginAirportID, ed_flights_OriginAirportID) + + def test_getitem4(self): + ed_flights = self.ed_flights().head(89) + pd_flights = self.pd_flights().head(89) + + ed_col0 = ed_flights[['DestCityName', 'DestCountry', 'DestLocation', 'DestRegion']] + try: + ed_col1 = ed_col0['Carrier'] + except KeyError: + pass + + pd_col1 = pd_flights['DestCountry'] + ed_col1 = ed_col0['DestCountry'] + + assert_pandas_eland_series_equal(pd_col1, ed_col1) + diff --git a/eland/tests/dataframe/test_iloc_pytest.py b/eland/tests/dataframe/test_iloc_pytest.py index 8c49d28..0b8d23c 100644 --- a/eland/tests/dataframe/test_iloc_pytest.py +++ b/eland/tests/dataframe/test_iloc_pytest.py @@ -12,28 +12,6 @@ import numpy as np class TestDataFrameiLoc(TestData): - def test_range(self): - columns = ['a', 'b', 'c', 'd', 'e'] - - r = pd.RangeIndex(0, 3, 1) - - i = pd.Int64Index([1, 2]) - - dates = pd.date_range('1/1/2000', periods=8) - - df = pd.DataFrame(np.random.randn(8, 4), index = dates, columns = ['A', 'B', 'C', 'D']) - - print(df) - - print("STEVE ", df.squeeze()) - - ii = slice(None) - rr = slice(None) - - print(df.iloc[:, 0:3]) - print(df.iloc[i, r]) - print(df.iloc[ii, rr]) - def test_iloc1(self): ed_flights = self.ed_flights() pd_flights = self.pd_flights() diff --git a/eland/tests/dataframe/test_info_pytest.py b/eland/tests/dataframe/test_info_pytest.py index 2fd2436..00102b4 100644 --- a/eland/tests/dataframe/test_info_pytest.py +++ b/eland/tests/dataframe/test_info_pytest.py @@ -1,4 +1,5 @@ # File called _pytest for PyCharm compatability +from io import StringIO from eland.tests.common import TestData @@ -9,4 +10,16 @@ class TestDataFrameInfo(TestData): ed_flights = self.ed_flights() pd_flights = self.pd_flights() - ed_flights.info() + ed_buf = StringIO() + pd_buf = StringIO() + + # Ignore memory_usage and first line (class name) + ed_flights.info(buf=ed_buf, memory_usage=False) + pd_flights.info(buf=pd_buf, memory_usage=False) + + ed_buf_lines = ed_buf.getvalue().split('\n') + pd_buf_lines = pd_buf.getvalue().split('\n') + + assert pd_buf_lines[1:] == ed_buf_lines[1:] + + print(self.ed_ecommerce().info()) diff --git a/eland/tests/dataframe/test_metrics_pytest.py b/eland/tests/dataframe/test_metrics_pytest.py new file mode 100644 index 0000000..8efda63 --- /dev/null +++ b/eland/tests/dataframe/test_metrics_pytest.py @@ -0,0 +1,46 @@ +# File called _pytest for PyCharm compatability + +from eland.tests.common import TestData + + +from pandas.util.testing import assert_series_equal + + +class TestDataFrameMean(TestData): + + def test_to_mean(self): + pd_flights = self.pd_flights() + ed_flights = self.ed_flights() + + pd_mean = pd_flights.mean() + ed_mean = ed_flights.mean() + + assert_series_equal(pd_mean, ed_mean) + + def test_to_sum(self): + pd_flights = self.pd_flights() + ed_flights = self.ed_flights() + + pd_sum = pd_flights.sum(numeric_only=True) + ed_sum = ed_flights.sum(numeric_only=True) + + assert_series_equal(pd_sum, ed_sum) + + def test_to_min(self): + pd_flights = self.pd_flights() + ed_flights = self.ed_flights() + + pd_min = pd_flights.min(numeric_only=True) + ed_min = ed_flights.min(numeric_only=True) + + assert_series_equal(pd_min, ed_min) + + def test_to_max(self): + pd_flights = self.pd_flights() + ed_flights = self.ed_flights() + + pd_max = pd_flights.max(numeric_only=True) + ed_max = ed_flights.max(numeric_only=True) + + assert_series_equal(pd_max, ed_max) + diff --git a/eland/tests/dataframe/test_sum_pytest.py b/eland/tests/dataframe/test_sum_pytest.py new file mode 100644 index 0000000..6c2f051 --- /dev/null +++ b/eland/tests/dataframe/test_sum_pytest.py @@ -0,0 +1,20 @@ +# File called _pytest for PyCharm compatability + +from eland.tests.common import TestData + + +from pandas.util.testing import assert_series_equal + + +class TestDataFrameSum(TestData): + + def test_to_mean1(self): + pd_flights = self.pd_flights() + ed_flights = self.ed_flights() + + pd_sum = pd_flights.sum(numeric_only=True) + ed_sum = ed_flights.sum(numeric_only=True) + + assert_series_equal(pd_sum, ed_sum) + + diff --git a/eland/tests/mappings/__init__.py b/eland/tests/mappings/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/eland/tests/mappings/test_dtypes_pytest.py b/eland/tests/mappings/test_dtypes_pytest.py new file mode 100644 index 0000000..2f0b07e --- /dev/null +++ b/eland/tests/mappings/test_dtypes_pytest.py @@ -0,0 +1,44 @@ +# File called _pytest for PyCharm compatability + +from eland.tests.common import TestData + +from pandas.util.testing import assert_series_equal + +class TestMappingsDtypes(TestData): + + def test_dtypes1(self): + ed_flights = self.ed_flights() + pd_flights = self.pd_flights() + + pd_dtypes = pd_flights.dtypes + ed_dtypes = ed_flights._query_compiler._mappings.dtypes() + + assert_series_equal(pd_dtypes, ed_dtypes) + + def test_dtypes2(self): + ed_flights = self.ed_flights() + pd_flights = self.pd_flights()[['Carrier', 'AvgTicketPrice', 'Cancelled']] + + pd_dtypes = pd_flights.dtypes + ed_dtypes = ed_flights._query_compiler._mappings.dtypes(columns=['Carrier', 'AvgTicketPrice', 'Cancelled']) + + assert_series_equal(pd_dtypes, ed_dtypes) + + def test_get_dtype_counts1(self): + ed_flights = self.ed_flights() + pd_flights = self.pd_flights() + + pd_dtypes = pd_flights.get_dtype_counts().sort_index() + ed_dtypes = ed_flights._query_compiler._mappings.get_dtype_counts().sort_index() + + assert_series_equal(pd_dtypes, ed_dtypes) + + def test_get_dtype_counts2(self): + ed_flights = self.ed_flights() + pd_flights = self.pd_flights()[['Carrier', 'AvgTicketPrice', 'Cancelled']] + + pd_dtypes = pd_flights.get_dtype_counts().sort_index() + ed_dtypes = ed_flights._query_compiler._mappings.\ + get_dtype_counts(columns=['Carrier', 'AvgTicketPrice', 'Cancelled']).sort_index() + + assert_series_equal(pd_dtypes, ed_dtypes) diff --git a/eland/tests/query/test_count_pytest.py b/eland/tests/query/test_count_pytest.py new file mode 100644 index 0000000..5b1e93d --- /dev/null +++ b/eland/tests/query/test_count_pytest.py @@ -0,0 +1,27 @@ +# File called _pytest for PyCharm compatability + +from eland.tests.common import TestData + +from eland import Query + + +class TestQueryCopy(TestData): + + def test_copy(self): + q = Query() + + q.exists('field_a') + q.exists('field_b', must=False) + + print(q.to_query()) + + q1 = Query(q) + + q.exists('field_c', must=False) + q1.exists('field_c1', must=False) + + print(q.to_query()) + print(q1.to_query()) + + + diff --git a/eland/utils.py b/eland/utils.py index a8e55a1..34ed1f4 100644 --- a/eland/utils.py +++ b/eland/utils.py @@ -36,7 +36,7 @@ def pandas_to_es(df, es_params, destination_index, if_exists='fail', chunk_size= mapping = Mappings._generate_es_mappings(df) # If table exists, check if_exists parameter - if client.indices().exists(destination_index): + if client.index_exists(index=destination_index): if if_exists == "fail": raise ValueError( "Could not create the index [{0}] because it " @@ -45,12 +45,12 @@ def pandas_to_es(df, es_params, destination_index, if_exists='fail', chunk_size= "'append' or 'replace' data.".format(destination_index) ) elif if_exists == "replace": - client.indices().delete(destination_index) - client.indices().create(destination_index, mapping) + client.index_delete(index=destination_index) + client.index_create(index=destination_index, mapping=mapping) # elif if_exists == "append": # TODO validate mapping is compatible else: - client.indices().create(destination_index, mapping) + client.index_create(index=destination_index, mapping=mapping) # Now add data actions = []