eland/eland/dataframe.py
Stephen Dodson d71ce9f50c Adding drop + the ability for operations to have a query
Significant refactor - needs cleanup
2019-07-11 10:11:57 +00:00

360 lines
13 KiB
Python

import warnings
import sys
import pandas as pd
import numpy as np
from pandas.compat import StringIO
from pandas.core.common import apply_if_callable, is_bool_indexer
from pandas.io.common import _expand_user, _stringify_path
from pandas.io.formats import console
from pandas.io.formats import format as fmt
from pandas.io.formats.printing import pprint_thing
from eland import NDFrame
from eland import Series
class DataFrame(NDFrame):
# This is effectively 2 constructors
# 1. client, index_pattern, columns, index_field
# 2. query_compiler
def __init__(self,
client=None,
index_pattern=None,
columns=None,
index_field=None,
query_compiler=None):
# python 3 syntax
super().__init__(
client=client,
index_pattern=index_pattern,
columns=columns,
index_field=index_field,
query_compiler=query_compiler)
def _get_columns(self):
return self._query_compiler.columns
columns = property(_get_columns)
@property
def empty(self):
"""Determines if the DataFrame is empty.
Returns:
True if the DataFrame is empty.
False otherwise.
"""
# TODO - this is called on every attribute get (most methods) from modin/pandas/base.py:3337
# (as Index.__len__ performs an query) we may want to cache self.index.empty()
return len(self.columns) == 0 or len(self.index) == 0
def head(self, n=5):
return super().head(n)
def tail(self, n=5):
return super().tail(n)
def __repr__(self):
"""
From pandas
"""
buf = StringIO()
max_rows = pd.get_option("display.max_rows")
max_cols = pd.get_option("display.max_columns")
show_dimensions = pd.get_option("display.show_dimensions")
if pd.get_option("display.expand_frame_repr"):
width, _ = console.get_console_size()
else:
width = None
self.to_string(buf=buf, max_rows=max_rows, max_cols=max_cols,
line_width=width, show_dimensions=show_dimensions)
return buf.getvalue()
def count(self):
"""
Count non-NA cells for each column (TODO row)
Counts are based on exists queries against ES
This is inefficient, as it creates N queries (N is number of fields).
An alternative approach is to use value_count aggregations. However, they have issues in that:
1. They can only be used with aggregatable fields (e.g. keyword not text)
2. For list fields they return multiple counts. E.g. tags=['elastic', 'ml'] returns value_count=2
for a single document.
"""
return self._query_compiler.count()
def info_es(self):
buf = StringIO()
super().info_es(buf)
return buf.getvalue()
def _index_summary(self):
head = self.head(1)._to_pandas().index[0]
tail = self.tail(1)._to_pandas().index[0]
index_summary = ', %s to %s' % (pprint_thing(head),
pprint_thing(tail))
name = "Index"
return '%s: %s entries%s' % (name, len(self), index_summary)
def info(self, verbose=None, buf=None, max_cols=None, memory_usage=None,
null_counts=None):
"""
Print a concise summary of a DataFrame.
This method prints information about a DataFrame including
the index dtype and column dtypes, non-null values and memory usage.
This copies a lot of code from pandas.DataFrame.info as it is difficult
to split out the appropriate code or creating a SparseDataFrame gives
incorrect results on types and counts.
"""
if buf is None: # pragma: no cover
buf = sys.stdout
lines = []
lines.append(str(type(self)))
lines.append(self._index_summary())
if len(self.columns) == 0:
lines.append('Empty {name}'.format(name=type(self).__name__))
fmt.buffer_put_lines(buf, lines)
return
cols = self.columns
# hack
if max_cols is None:
max_cols = pd.get_option('display.max_info_columns',
len(self.columns) + 1)
max_rows = pd.get_option('display.max_info_rows', len(self) + 1)
if null_counts is None:
show_counts = ((len(self.columns) <= max_cols) and
(len(self) < max_rows))
else:
show_counts = null_counts
exceeds_info_cols = len(self.columns) > max_cols
# From pandas.DataFrame
def _put_str(s, space):
return '{s}'.format(s=s)[:space].ljust(space)
def _verbose_repr():
lines.append('Data columns (total %d columns):' %
len(self.columns))
space = max(len(pprint_thing(k)) for k in self.columns) + 4
counts = None
tmpl = "{count}{dtype}"
if show_counts:
counts = self.count()
if len(cols) != len(counts): # pragma: no cover
raise AssertionError(
'Columns must equal counts '
'({cols:d} != {counts:d})'.format(
cols=len(cols), counts=len(counts)))
tmpl = "{count} non-null {dtype}"
dtypes = self.dtypes
for i, col in enumerate(self.columns):
dtype = dtypes.iloc[i]
col = pprint_thing(col)
count = ""
if show_counts:
count = counts.iloc[i]
lines.append(_put_str(col, space) + tmpl.format(count=count,
dtype=dtype))
def _non_verbose_repr():
lines.append(self.columns._summary(name='Columns'))
def _sizeof_fmt(num, size_qualifier):
# returns size in human readable format
for x in ['bytes', 'KB', 'MB', 'GB', 'TB']:
if num < 1024.0:
return ("{num:3.1f}{size_q} "
"{x}".format(num=num, size_q=size_qualifier, x=x))
num /= 1024.0
return "{num:3.1f}{size_q} {pb}".format(num=num,
size_q=size_qualifier,
pb='PB')
if verbose:
_verbose_repr()
elif verbose is False: # specifically set to False, not nesc None
_non_verbose_repr()
else:
if exceeds_info_cols:
_non_verbose_repr()
else:
_verbose_repr()
counts = self.get_dtype_counts()
dtypes = ['{k}({kk:d})'.format(k=k[0], kk=k[1]) for k
in sorted(counts.items())]
lines.append('dtypes: {types}'.format(types=', '.join(dtypes)))
if memory_usage is None:
memory_usage = pd.get_option('display.memory_usage')
if memory_usage:
# append memory usage of df to display
size_qualifier = ''
# TODO - this is different from pd.DataFrame as we shouldn't
# really hold much in memory. For now just approximate with getsizeof + ignore deep
mem_usage = sys.getsizeof(self)
lines.append("memory usage: {mem}\n".format(
mem=_sizeof_fmt(mem_usage, size_qualifier)))
fmt.buffer_put_lines(buf, lines)
def to_string(self, buf=None, columns=None, col_space=None, header=True,
index=True, na_rep='NaN', formatters=None, float_format=None,
sparsify=None, index_names=True, justify=None,
max_rows=None, max_cols=None, show_dimensions=False,
decimal='.', line_width=None):
"""
From pandas - except we set max_rows default to avoid careless extraction of entire index
"""
if max_rows is None:
warnings.warn("DataFrame.to_string called without max_rows set "
"- this will return entire index results. "
"Setting max_rows=60, overwrite if different behaviour is required.")
max_rows = 60
# Create a slightly bigger dataframe than display
df = self._build_repr_df(max_rows+1, max_cols)
if buf is not None:
_buf = _expand_user(_stringify_path(buf))
else:
_buf = StringIO()
df.to_string(buf=_buf, columns=columns,
col_space=col_space, na_rep=na_rep,
formatters=formatters,
float_format=float_format,
sparsify=sparsify, justify=justify,
index_names=index_names,
header=header, index=index,
max_rows=max_rows,
max_cols=max_cols,
show_dimensions=False, # print this outside of this call
decimal=decimal,
line_width=line_width)
# Our fake dataframe has incorrect number of rows (max_rows*2+1) - write out
# the correct number of rows
if show_dimensions:
_buf.write("\n\n[{nrows} rows x {ncols} columns]"
.format(nrows=len(self.index), ncols=len(self.columns)))
if buf is None:
result = _buf.getvalue()
return result
def _getitem(self, key):
"""Get the column specified by key for this DataFrame.
Args:
key : The column name.
Returns:
A Pandas Series representing the value for the column.
"""
key = apply_if_callable(key, self)
# Shortcut if key is an actual column
try:
if key in self.columns:
return self._getitem_column(key)
except (KeyError, ValueError, TypeError):
pass
if isinstance(key, (Series, np.ndarray, pd.Index, list)):
return self._getitem_array(key)
elif isinstance(key, DataFrame):
return self.where(key)
else:
return self._getitem_column(key)
def _getitem_column(self, key):
if key not in self.columns:
raise KeyError("Requested column is not in the DataFrame {}".format(key))
s = self._reduce_dimension(self._query_compiler.getitem_column_array([key]))
s._parent = self
return s
def _getitem_array(self, key):
if isinstance(key, Series):
key = key._to_pandas()
if is_bool_indexer(key):
if isinstance(key, pd.Series) and not key.index.equals(self.index):
warnings.warn(
"Boolean Series key will be reindexed to match DataFrame index.",
PendingDeprecationWarning,
stacklevel=3,
)
elif len(key) != len(self.index):
raise ValueError(
"Item wrong length {} instead of {}.".format(
len(key), len(self.index)
)
)
key = check_bool_indexer(self.index, key)
# We convert to a RangeIndex because getitem_row_array is expecting a list
# of indices, and RangeIndex will give us the exact indices of each boolean
# requested.
key = pd.RangeIndex(len(self.index))[key]
if len(key):
return DataFrame(
query_compiler=self._query_compiler.getitem_row_array(key)
)
else:
return DataFrame(columns=self.columns)
else:
if any(k not in self.columns for k in key):
raise KeyError(
"{} not index".format(
str([k for k in key if k not in self.columns]).replace(",", "")
)
)
return DataFrame(
query_compiler=self._query_compiler.getitem_column_array(key)
)
def _create_or_update_from_compiler(self, new_query_compiler, inplace=False):
"""Returns or updates a DataFrame given new query_compiler"""
assert (
isinstance(new_query_compiler, type(self._query_compiler))
or type(new_query_compiler) in self._query_compiler.__class__.__bases__
), "Invalid Query Compiler object: {}".format(type(new_query_compiler))
if not inplace:
return DataFrame(query_compiler=new_query_compiler)
else:
self._query_compiler=new_query_compiler
def _reduce_dimension(self, query_compiler):
return Series(query_compiler=query_compiler)
def _to_pandas(self):
return self._query_compiler.to_pandas()
def squeeze(self, axis=None):
return DataFrame(
query_compiler=self._query_compiler.squeeze(axis)
)