mirror of
https://github.com/elastic/eland.git
synced 2025-07-11 00:02:14 +08:00
iloc is (mainly) working.
This commit is contained in:
parent
245def48e9
commit
a73c999290
@ -1,11 +1,18 @@
|
||||
import warnings
|
||||
import sys
|
||||
|
||||
import pandas as pd
|
||||
import numpy as np
|
||||
|
||||
from pandas.compat import StringIO
|
||||
from pandas.core.common import apply_if_callable, is_bool_indexer
|
||||
from pandas.io.common import _expand_user, _stringify_path
|
||||
from pandas.io.formats import console
|
||||
from pandas.io.formats import format as fmt
|
||||
from pandas.io.formats.printing import pprint_thing
|
||||
|
||||
from eland import NDFrame
|
||||
from eland import Series
|
||||
|
||||
|
||||
class DataFrame(NDFrame):
|
||||
@ -67,13 +74,22 @@ class DataFrame(NDFrame):
|
||||
|
||||
return buf.getvalue()
|
||||
|
||||
def info_es(self):
|
||||
buf = StringIO()
|
||||
|
||||
super().info_es(buf)
|
||||
|
||||
return buf.getvalue()
|
||||
|
||||
|
||||
|
||||
def to_string(self, buf=None, columns=None, col_space=None, header=True,
|
||||
index=True, na_rep='NaN', formatters=None, float_format=None,
|
||||
sparsify=None, index_names=True, justify=None,
|
||||
max_rows=None, max_cols=None, show_dimensions=False,
|
||||
decimal='.', line_width=None):
|
||||
"""
|
||||
From pandas - except we set max_rows default to avoid careless
|
||||
From pandas - except we set max_rows default to avoid careless extraction of entire index
|
||||
"""
|
||||
if max_rows is None:
|
||||
warnings.warn("DataFrame.to_string called without max_rows set "
|
||||
@ -111,3 +127,82 @@ class DataFrame(NDFrame):
|
||||
if buf is None:
|
||||
result = _buf.getvalue()
|
||||
return result
|
||||
|
||||
def _getitem(self, key):
|
||||
"""Get the column specified by key for this DataFrame.
|
||||
|
||||
Args:
|
||||
key : The column name.
|
||||
|
||||
Returns:
|
||||
A Pandas Series representing the value for the column.
|
||||
"""
|
||||
key = apply_if_callable(key, self)
|
||||
# Shortcut if key is an actual column
|
||||
try:
|
||||
if key in self.columns:
|
||||
return self._getitem_column(key)
|
||||
except (KeyError, ValueError, TypeError):
|
||||
pass
|
||||
if isinstance(key, (Series, np.ndarray, pd.Index, list)):
|
||||
return self._getitem_array(key)
|
||||
elif isinstance(key, DataFrame):
|
||||
return self.where(key)
|
||||
else:
|
||||
return self._getitem_column(key)
|
||||
|
||||
def _getitem_column(self, key):
|
||||
if key not in self.columns:
|
||||
raise KeyError("{}".format(key))
|
||||
s = self._reduce_dimension(self._query_compiler.getitem_column_array([key]))
|
||||
s._parent = self
|
||||
return s
|
||||
|
||||
def _getitem_array(self, key):
|
||||
if isinstance(key, Series):
|
||||
key = key._to_pandas()
|
||||
if is_bool_indexer(key):
|
||||
if isinstance(key, pd.Series) and not key.index.equals(self.index):
|
||||
warnings.warn(
|
||||
"Boolean Series key will be reindexed to match DataFrame index.",
|
||||
PendingDeprecationWarning,
|
||||
stacklevel=3,
|
||||
)
|
||||
elif len(key) != len(self.index):
|
||||
raise ValueError(
|
||||
"Item wrong length {} instead of {}.".format(
|
||||
len(key), len(self.index)
|
||||
)
|
||||
)
|
||||
key = check_bool_indexer(self.index, key)
|
||||
# We convert to a RangeIndex because getitem_row_array is expecting a list
|
||||
# of indices, and RangeIndex will give us the exact indices of each boolean
|
||||
# requested.
|
||||
key = pd.RangeIndex(len(self.index))[key]
|
||||
if len(key):
|
||||
return DataFrame(
|
||||
query_compiler=self._query_compiler.getitem_row_array(key)
|
||||
)
|
||||
else:
|
||||
return DataFrame(columns=self.columns)
|
||||
else:
|
||||
if any(k not in self.columns for k in key):
|
||||
raise KeyError(
|
||||
"{} not index".format(
|
||||
str([k for k in key if k not in self.columns]).replace(",", "")
|
||||
)
|
||||
)
|
||||
return DataFrame(
|
||||
query_compiler=self._query_compiler.getitem_column_array(key)
|
||||
)
|
||||
|
||||
def _reduce_dimension(self, query_compiler):
|
||||
return Series(query_compiler=query_compiler)
|
||||
|
||||
def _to_pandas(self):
|
||||
return self._query_compiler.to_pandas()
|
||||
|
||||
def squeeze(self, axis=None):
|
||||
return DataFrame(
|
||||
query_compiler=self._query_compiler.squeeze(axis)
|
||||
)
|
||||
|
@ -49,3 +49,16 @@ class Index:
|
||||
|
||||
def __len__(self):
|
||||
return self._query_compiler._index_count()
|
||||
|
||||
# Make iterable
|
||||
def __next__(self):
|
||||
# TODO resolve this hack to make this 'iterable'
|
||||
raise StopIteration()
|
||||
|
||||
def __iter__(self):
|
||||
return self
|
||||
|
||||
def info_es(self, buf):
|
||||
buf.write("Index:\n")
|
||||
buf.write("\tindex_field: {0}\n".format(self.index_field))
|
||||
buf.write("\tis_source_field: {0}\n".format(self.is_source_field))
|
||||
|
@ -456,13 +456,7 @@ class Mappings:
|
||||
return pd.Series(self._mappings_capabilities[self._mappings_capabilities._source == True].groupby('pd_dtype')[
|
||||
'_source'].count().to_dict())
|
||||
|
||||
def to_pandas(self):
|
||||
"""
|
||||
|
||||
Returns
|
||||
-------
|
||||
df : pd.DataFrame
|
||||
pandas DaraFrame representing this index
|
||||
"""
|
||||
|
||||
def info_es(self, buf):
|
||||
buf.write("Mappings:\n")
|
||||
buf.write("\tcapabilities: {0}\n".format(self._mappings_capabilities))
|
||||
|
||||
|
@ -24,6 +24,7 @@ only Elasticsearch aggregatable fields can be aggregated or grouped.
|
||||
"""
|
||||
|
||||
from modin.pandas.base import BasePandasDataset
|
||||
from modin.pandas.indexing import _iLocIndexer
|
||||
|
||||
from eland import ElandQueryCompiler
|
||||
|
||||
@ -51,6 +52,7 @@ class NDFrame(BasePandasDataset):
|
||||
index_field=index_field)
|
||||
self._query_compiler = query_compiler
|
||||
|
||||
|
||||
def _get_index(self):
|
||||
return self._query_compiler.index
|
||||
|
||||
@ -72,6 +74,30 @@ class NDFrame(BasePandasDataset):
|
||||
|
||||
return head.append(tail)
|
||||
|
||||
def _to_pandas(self):
|
||||
return self._query_compiler.to_pandas()
|
||||
def __getattr__(self, key):
|
||||
"""After regular attribute access, looks up the name in the columns
|
||||
|
||||
Args:
|
||||
key (str): Attribute name.
|
||||
|
||||
Returns:
|
||||
The value of the attribute.
|
||||
"""
|
||||
print(key)
|
||||
try:
|
||||
return object.__getattribute__(self, key)
|
||||
except AttributeError as e:
|
||||
if key in self.columns:
|
||||
return self[key]
|
||||
raise e
|
||||
|
||||
@property
|
||||
def iloc(self):
|
||||
"""Purely integer-location based indexing for selection by position.
|
||||
|
||||
"""
|
||||
return _iLocIndexer(self)
|
||||
|
||||
def info_es(self, buf):
|
||||
self._query_compiler.info_es(buf)
|
||||
|
||||
|
@ -1,5 +1,10 @@
|
||||
from enum import Enum
|
||||
|
||||
from pandas.core.indexes.numeric import Int64Index
|
||||
from pandas.core.indexes.range import RangeIndex
|
||||
|
||||
import copy
|
||||
|
||||
|
||||
class Operations:
|
||||
"""
|
||||
@ -39,9 +44,16 @@ class Operations:
|
||||
@staticmethod
|
||||
def to_string(order):
|
||||
if order == Operations.SortOrder.ASC:
|
||||
return ":asc"
|
||||
return "asc"
|
||||
|
||||
return "desc"
|
||||
|
||||
def from_string(order):
|
||||
if order == "asc":
|
||||
return Operations.SortOrder.ASC
|
||||
|
||||
return Operations.SortOrder.DESC
|
||||
|
||||
return ":desc"
|
||||
|
||||
def __init__(self, tasks=None):
|
||||
if tasks == None:
|
||||
@ -53,7 +65,7 @@ class Operations:
|
||||
return type(self)(*args, **kwargs)
|
||||
|
||||
def copy(self):
|
||||
return self.__constructor__(tasks=self._tasks.copy())
|
||||
return self.__constructor__(tasks=copy.deepcopy(self._tasks))
|
||||
|
||||
def head(self, index, n):
|
||||
# Add a task that is an ascending sort with size=n
|
||||
@ -61,13 +73,24 @@ class Operations:
|
||||
self._tasks.append(task)
|
||||
|
||||
def tail(self, index, n):
|
||||
|
||||
# Add a task that is descending sort with size=n
|
||||
task = ('tail', (index.sort_field, n))
|
||||
self._tasks.append(task)
|
||||
|
||||
def set_columns(self, columns):
|
||||
self._tasks['columns'] = columns
|
||||
# Setting columns at different phases of the task list may result in different
|
||||
# operations. So instead of setting columns once, set when it happens in call chain
|
||||
# TODO - column renaming
|
||||
# TODO - validate we are setting columns to a subset of last columns?
|
||||
task = ('columns', columns)
|
||||
self._tasks.append(task)
|
||||
|
||||
def get_columns(self):
|
||||
# Iterate backwards through task list looking for last 'columns' task
|
||||
for task in reversed(self._tasks):
|
||||
if task[0] == 'columns':
|
||||
return task[1]
|
||||
return None
|
||||
|
||||
def __repr__(self):
|
||||
return repr(self._tasks)
|
||||
@ -77,15 +100,38 @@ class Operations:
|
||||
|
||||
size, sort_params = Operations._query_to_params(query)
|
||||
|
||||
es_results = query_compiler._client.search(
|
||||
index=query_compiler._index_pattern,
|
||||
size=size,
|
||||
sort=sort_params)
|
||||
# Only return requested columns
|
||||
columns = self.get_columns()
|
||||
|
||||
# If size=None use scan not search - then post sort results when in df
|
||||
# If size>10000 use scan
|
||||
if size is not None and size <= 10000:
|
||||
es_results = query_compiler._client.search(
|
||||
index=query_compiler._index_pattern,
|
||||
size=size,
|
||||
sort=sort_params,
|
||||
_source=columns)
|
||||
else:
|
||||
es_results = query_compiler._client.scan(
|
||||
index=query_compiler._index_pattern,
|
||||
_source=columns)
|
||||
# create post sort
|
||||
if sort_params is not None:
|
||||
post_processing.append(self._sort_params_to_postprocessing(sort_params))
|
||||
|
||||
df = query_compiler._es_results_to_pandas(es_results)
|
||||
|
||||
return self._apply_df_post_processing(df, post_processing)
|
||||
|
||||
def iloc(self, index, columns):
|
||||
# index and columns are indexers
|
||||
task = ('iloc', (index, columns))
|
||||
self._tasks.append(task)
|
||||
|
||||
def squeeze(self, axis):
|
||||
task = ('squeeze', axis)
|
||||
self._tasks.append(task)
|
||||
|
||||
def to_count(self, query_compiler):
|
||||
query, post_processing = self._to_es_query()
|
||||
|
||||
@ -106,11 +152,23 @@ class Operations:
|
||||
|
||||
return query_compiler._client.count(index=query_compiler._index_pattern, body=exists_query)
|
||||
|
||||
@staticmethod
|
||||
def _sort_params_to_postprocessing(input):
|
||||
# Split string
|
||||
sort_params = input.split(":")
|
||||
|
||||
query_sort_field = sort_params[0]
|
||||
query_sort_order = Operations.SortOrder.from_string(sort_params[1])
|
||||
|
||||
task = ('sort_field', (query_sort_field, query_sort_order))
|
||||
|
||||
return task
|
||||
|
||||
@staticmethod
|
||||
def _query_to_params(query):
|
||||
sort_params = None
|
||||
if query['query_sort_field'] and query['query_sort_order']:
|
||||
sort_params = query['query_sort_field'] + Operations.SortOrder.to_string(query['query_sort_order'])
|
||||
sort_params = query['query_sort_field'] + ":" + Operations.SortOrder.to_string(query['query_sort_order'])
|
||||
|
||||
size = query['query_size']
|
||||
|
||||
@ -135,6 +193,25 @@ class Operations:
|
||||
df = df.head(action[1][1])
|
||||
elif action[0] == 'tail':
|
||||
df = df.tail(action[1][1])
|
||||
elif action[0] == 'sort_field':
|
||||
sort_field = action[1][0]
|
||||
sort_order = action[1][1]
|
||||
if sort_order == Operations.SortOrder.ASC:
|
||||
df = df.sort_values(sort_field, True)
|
||||
else:
|
||||
df = df.sort_values(sort_field, False)
|
||||
elif action[0] == 'iloc':
|
||||
index_indexer = action[1][0]
|
||||
column_indexer = action[1][1]
|
||||
if index_indexer is None:
|
||||
index_indexer = slice(None)
|
||||
if column_indexer is None:
|
||||
column_indexer = slice(None)
|
||||
df = df.iloc[index_indexer, column_indexer]
|
||||
elif action[0] == 'squeeze':
|
||||
print(df)
|
||||
df = df.squeeze(axis=action[1])
|
||||
print(df)
|
||||
|
||||
return df
|
||||
|
||||
@ -145,7 +222,8 @@ class Operations:
|
||||
# other operations require in-core post-processing of results
|
||||
query = {"query_sort_field": None,
|
||||
"query_sort_order": None,
|
||||
"query_size": None}
|
||||
"query_size": None,
|
||||
"query_fields": None}
|
||||
|
||||
post_processing = []
|
||||
|
||||
@ -154,6 +232,10 @@ class Operations:
|
||||
query, post_processing = self._resolve_head(task, query, post_processing)
|
||||
elif task[0] == 'tail':
|
||||
query, post_processing = self._resolve_tail(task, query, post_processing)
|
||||
elif task[0] == 'iloc':
|
||||
query, post_processing = self._resolve_iloc(task, query, post_processing)
|
||||
else: # a lot of operations simply post-process the dataframe - put these straight through
|
||||
query, post_processing = self._resolve_post_processing_task(task, query, post_processing)
|
||||
|
||||
return query, post_processing
|
||||
|
||||
@ -229,3 +311,44 @@ class Operations:
|
||||
post_processing.append(('sort_index'))
|
||||
|
||||
return query, post_processing
|
||||
|
||||
def _resolve_iloc(self, item, query, post_processing):
|
||||
# tail - sort desc, size n, post-process sort asc
|
||||
# |---4--7-9---------|
|
||||
|
||||
# This is a list of items we return via an integer index
|
||||
int_index = item[1][0]
|
||||
if int_index is not None:
|
||||
last_item = int_index.max()
|
||||
|
||||
# If we have a query_size we do this post processing
|
||||
if query['query_size'] is not None:
|
||||
post_processing.append(item)
|
||||
return query, post_processing
|
||||
|
||||
# size should be > last item
|
||||
query['query_size'] = last_item + 1
|
||||
post_processing.append(item)
|
||||
|
||||
return query, post_processing
|
||||
|
||||
def _resolve_post_processing_task(self, item, query, post_processing):
|
||||
# Just do this in post-processing
|
||||
post_processing.append(item)
|
||||
|
||||
return query, post_processing
|
||||
|
||||
def info_es(self, buf):
|
||||
buf.write("Operations:\n")
|
||||
buf.write("\ttasks: {0}\n".format(self._tasks))
|
||||
|
||||
query, post_processing = self._to_es_query()
|
||||
size, sort_params = Operations._query_to_params(query)
|
||||
columns = self.get_columns()
|
||||
|
||||
buf.write("\tsize: {0}\n".format(size))
|
||||
buf.write("\tsort_params: {0}\n".format(sort_params))
|
||||
buf.write("\tcolumns: {0}\n".format(columns))
|
||||
buf.write("\tpost_processing: {0}\n".format(post_processing))
|
||||
|
||||
|
||||
|
@ -6,6 +6,9 @@ from eland import Index
|
||||
from eland import Mappings
|
||||
from eland import Operations
|
||||
|
||||
from pandas.core.indexes.numeric import Int64Index
|
||||
from pandas.core.indexes.range import RangeIndex
|
||||
|
||||
|
||||
class ElandQueryCompiler(BaseQueryCompiler):
|
||||
|
||||
@ -29,13 +32,24 @@ class ElandQueryCompiler(BaseQueryCompiler):
|
||||
else:
|
||||
self._operations = operations
|
||||
|
||||
if columns is not None:
|
||||
self.columns = columns
|
||||
|
||||
def _get_index(self):
|
||||
return self._index
|
||||
|
||||
def _get_columns(self):
|
||||
return pd.Index(self._mappings.source_fields())
|
||||
columns = self._operations.get_columns()
|
||||
if columns is None:
|
||||
# default to all
|
||||
columns = self._mappings.source_fields()
|
||||
|
||||
columns = property(_get_columns)
|
||||
return pd.Index(columns)
|
||||
|
||||
def _set_columns(self, columns):
|
||||
self._operations.set_columns(columns)
|
||||
|
||||
columns = property(_get_columns, _set_columns)
|
||||
index = property(_get_index)
|
||||
|
||||
# END Index, columns, and dtypes objects
|
||||
@ -218,7 +232,7 @@ class ElandQueryCompiler(BaseQueryCompiler):
|
||||
return self.__constructor__(
|
||||
client=self._client,
|
||||
index_pattern=self._index_pattern,
|
||||
columns=self.columns,
|
||||
columns=None, # columns are embedded in operations
|
||||
index_field=self._index.index_field,
|
||||
operations=self._operations.copy()
|
||||
)
|
||||
@ -245,3 +259,46 @@ class ElandQueryCompiler(BaseQueryCompiler):
|
||||
Pandas DataFrame
|
||||
"""
|
||||
return self._operations.to_pandas(self)
|
||||
|
||||
# __getitem__ methods
|
||||
def getitem_column_array(self, key, numeric=False):
|
||||
"""Get column data for target labels.
|
||||
|
||||
Args:
|
||||
key: Target labels by which to retrieve data.
|
||||
numeric: A boolean representing whether or not the key passed in represents
|
||||
the numeric index or the named index.
|
||||
|
||||
Returns:
|
||||
A new QueryCompiler.
|
||||
"""
|
||||
result = self.copy()
|
||||
|
||||
if numeric:
|
||||
raise NotImplementedError("Not implemented yet...")
|
||||
|
||||
result._operations.set_columns(key)
|
||||
|
||||
return result
|
||||
|
||||
def squeeze(self, axis=None):
|
||||
result = self.copy()
|
||||
|
||||
result._operations.squeeze(axis)
|
||||
|
||||
return result
|
||||
|
||||
def view(self, index=None, columns=None):
|
||||
result = self.copy()
|
||||
|
||||
result._operations.iloc(index, columns)
|
||||
|
||||
return result
|
||||
|
||||
def info_es(self, buf):
|
||||
buf.write("index_pattern: {index_pattern}\n".format(index_pattern=self._index_pattern))
|
||||
|
||||
self._index.info_es(buf)
|
||||
self._mappings.info_es(buf)
|
||||
self._operations.info_es(buf)
|
||||
|
||||
|
153
eland/series.py
153
eland/series.py
@ -0,0 +1,153 @@
|
||||
"""
|
||||
Series
|
||||
---------
|
||||
One-dimensional ndarray with axis labels (including time series).
|
||||
|
||||
The underlying data resides in Elasticsearch and the API aligns as much as
|
||||
possible with pandas.DataFrame API.
|
||||
|
||||
This allows the eland.Series to access large datasets stored in Elasticsearch,
|
||||
without storing the dataset in local memory.
|
||||
|
||||
Implementation Details
|
||||
----------------------
|
||||
Based on NDFrame which underpins eland.1DataFrame
|
||||
|
||||
"""
|
||||
|
||||
import pandas as pd
|
||||
import warnings
|
||||
|
||||
from eland import NDFrame
|
||||
|
||||
|
||||
class Series(NDFrame):
|
||||
"""
|
||||
pandas.Series like API that proxies into Elasticsearch index(es).
|
||||
|
||||
Parameters
|
||||
----------
|
||||
client : eland.Client
|
||||
A reference to a Elasticsearch python client
|
||||
|
||||
index_pattern : str
|
||||
An Elasticsearch index pattern. This can contain wildcards (e.g. filebeat-*).
|
||||
|
||||
field_name : str
|
||||
The field to base the series on
|
||||
|
||||
See Also
|
||||
--------
|
||||
|
||||
Examples
|
||||
--------
|
||||
|
||||
import eland as ed
|
||||
client = ed.Client(Elasticsearch())
|
||||
s = ed.DataFrame(client, 'reviews', 'date')
|
||||
df.head()
|
||||
reviewerId vendorId rating date
|
||||
0 0 0 5 2006-04-07 17:08
|
||||
1 1 1 5 2006-05-04 12:16
|
||||
2 2 2 4 2006-04-21 12:26
|
||||
3 3 3 5 2006-04-18 15:48
|
||||
4 3 4 5 2006-04-18 15:49
|
||||
|
||||
Notice that the types are based on Elasticsearch mappings
|
||||
|
||||
Notes
|
||||
-----
|
||||
If the Elasticsearch index is deleted or index mappings are changed after this
|
||||
object is created, the object is not rebuilt and so inconsistencies can occur.
|
||||
|
||||
"""
|
||||
|
||||
def __init__(self,
|
||||
client=None,
|
||||
index_pattern=None,
|
||||
name=None,
|
||||
index_field=None,
|
||||
query_compiler=None):
|
||||
# Series has 1 column
|
||||
if name is None:
|
||||
columns = None
|
||||
else:
|
||||
columns = [name]
|
||||
|
||||
super().__init__(
|
||||
client=client,
|
||||
index_pattern=index_pattern,
|
||||
columns=columns,
|
||||
index_field=index_field,
|
||||
query_compiler=query_compiler)
|
||||
|
||||
@property
|
||||
def empty(self):
|
||||
"""Determines if the Series is empty.
|
||||
|
||||
Returns:
|
||||
True if the Series is empty.
|
||||
False otherwise.
|
||||
"""
|
||||
# TODO - this is called on every attribute get (most methods) from modin/pandas/base.py:3337
|
||||
# (as Index.__len__ performs an query) we may want to cache self.index.empty()
|
||||
return len(self.index) == 0
|
||||
|
||||
def _get_name(self):
|
||||
return self._query_compiler.columns[0]
|
||||
|
||||
name = property(_get_name)
|
||||
|
||||
def head(self, n=5):
|
||||
return super().head(n)
|
||||
|
||||
def tail(self, n=5):
|
||||
return super().tail(n)
|
||||
|
||||
# ----------------------------------------------------------------------
|
||||
# Rendering Methods
|
||||
def __repr__(self):
|
||||
num_rows = pd.get_option("max_rows") or 60
|
||||
|
||||
return self.to_string(max_rows=num_rows)
|
||||
|
||||
def to_string(
|
||||
self,
|
||||
buf=None,
|
||||
na_rep="NaN",
|
||||
float_format=None,
|
||||
header=True,
|
||||
index=True,
|
||||
length=False,
|
||||
dtype=False,
|
||||
name=False,
|
||||
max_rows=None):
|
||||
|
||||
if max_rows is None:
|
||||
warnings.warn("Series.to_string called without max_rows set "
|
||||
"- this will return entire index results. "
|
||||
"Setting max_rows=60, overwrite if different behaviour is required.")
|
||||
max_rows = 60
|
||||
|
||||
# Create a slightly bigger dataframe than display
|
||||
temp_df = self._build_repr_df(max_rows+1, None)
|
||||
if isinstance(temp_df, pd.DataFrame):
|
||||
temp_df = temp_df[self.name]
|
||||
temp_str = repr(temp_df)
|
||||
if self.name is not None:
|
||||
name_str = "Name: {}, ".format(str(self.name))
|
||||
else:
|
||||
name_str = ""
|
||||
if len(self.index) > max_rows:
|
||||
len_str = "Length: {}, ".format(len(self.index))
|
||||
else:
|
||||
len_str = ""
|
||||
dtype_str = "dtype: {}".format(temp_str.rsplit("dtype: ", 1)[-1])
|
||||
if len(self) == 0:
|
||||
return "Series([], {}{}".format(name_str, dtype_str)
|
||||
return temp_str.rsplit("\nName:", 1)[0] + "\n{}{}{}".format(
|
||||
name_str, len_str, dtype_str
|
||||
)
|
||||
|
||||
def _to_pandas(self):
|
||||
return self._query_compiler.to_pandas()[self.name]
|
@ -1,10 +1,9 @@
|
||||
import pytest
|
||||
|
||||
import eland as ed
|
||||
|
||||
import pandas as pd
|
||||
|
||||
from pandas.util.testing import (assert_frame_equal)
|
||||
from pandas.util.testing import (assert_frame_equal, assert_series_equal)
|
||||
|
||||
import os
|
||||
|
||||
@ -47,14 +46,26 @@ class TestData:
|
||||
|
||||
def assert_pandas_eland_frame_equal(left, right):
|
||||
if not isinstance(left, pd.DataFrame):
|
||||
raise AssertionError("Expected type {exp_type}, found {act_type} instead",
|
||||
exp_type=type(pd.DataFrame), act_type=type(left))
|
||||
raise AssertionError("Expected type {exp_type}, found {act_type} instead".format(
|
||||
exp_type='pd.DataFrame', act_type=type(left)))
|
||||
|
||||
if not isinstance(right, ed.DataFrame):
|
||||
raise AssertionError("Expected type {exp_type}, found {act_type} instead",
|
||||
exp_type=type(ed.DataFrame), act_type=type(right))
|
||||
raise AssertionError("Expected type {exp_type}, found {act_type} instead".format(
|
||||
exp_type='ed.DataFrame', act_type=type(right)))
|
||||
|
||||
# Use pandas tests to check similarity
|
||||
assert_frame_equal(left, right._to_pandas())
|
||||
|
||||
|
||||
def assert_pandas_eland_series_equal(left, right):
|
||||
if not isinstance(left, pd.Series):
|
||||
raise AssertionError("Expected type {exp_type}, found {act_type} instead".format(
|
||||
exp_type='pd.Series', act_type=type(left)))
|
||||
|
||||
if not isinstance(right, ed.Series):
|
||||
raise AssertionError("Expected type {exp_type}, found {act_type} instead".format(
|
||||
exp_type='ed.Series', act_type=type(right)))
|
||||
|
||||
# Use pandas tests to check similarity
|
||||
assert_series_equal(left, right._to_pandas())
|
||||
|
||||
|
13
eland/tests/dataframe/test_count_pytest.py
Normal file
13
eland/tests/dataframe/test_count_pytest.py
Normal file
@ -0,0 +1,13 @@
|
||||
# File called _pytest for PyCharm compatability
|
||||
|
||||
from eland.tests.common import TestData
|
||||
|
||||
|
||||
class TestDataFrameCount(TestData):
|
||||
|
||||
def test_to_string1(self):
|
||||
ed_flights = self.ed_flights()
|
||||
pd_flights = self.pd_flights()
|
||||
|
||||
#ed_count = ed_flights.count()
|
||||
|
39
eland/tests/dataframe/test_getitem_pytest.py
Normal file
39
eland/tests/dataframe/test_getitem_pytest.py
Normal file
@ -0,0 +1,39 @@
|
||||
# File called _pytest for PyCharm compatability
|
||||
import pandas as pd
|
||||
|
||||
from eland.tests.common import TestData
|
||||
from eland.tests.common import (
|
||||
assert_pandas_eland_frame_equal,
|
||||
assert_pandas_eland_series_equal
|
||||
)
|
||||
|
||||
|
||||
|
||||
class TestDataFrameGetItem(TestData):
|
||||
|
||||
def test_getitem1(self):
|
||||
ed_flights = self.ed_flights().head(103)
|
||||
pd_flights = self.pd_flights().head(103)
|
||||
|
||||
ed_flights_OriginAirportID = ed_flights['OriginAirportID']
|
||||
pd_flights_OriginAirportID = pd_flights['OriginAirportID']
|
||||
|
||||
assert_pandas_eland_series_equal(pd_flights_OriginAirportID, ed_flights_OriginAirportID)
|
||||
|
||||
def test_getitem2(self):
|
||||
ed_flights = self.ed_flights().head(42)
|
||||
pd_flights = self.pd_flights().head(42)
|
||||
|
||||
ed_flights_slice = ed_flights[['OriginAirportID', 'AvgTicketPrice', 'Carrier']]
|
||||
pd_flights_slice = pd_flights[['OriginAirportID', 'AvgTicketPrice', 'Carrier']]
|
||||
|
||||
assert_pandas_eland_frame_equal(pd_flights_slice, ed_flights_slice)
|
||||
|
||||
def test_getitem3(self):
|
||||
ed_flights = self.ed_flights().head(89)
|
||||
pd_flights = self.pd_flights().head(89)
|
||||
|
||||
ed_flights_OriginAirportID = ed_flights.OriginAirportID
|
||||
pd_flights_OriginAirportID = pd_flights.OriginAirportID
|
||||
|
||||
assert_pandas_eland_series_equal(pd_flights_OriginAirportID, ed_flights_OriginAirportID)
|
76
eland/tests/dataframe/test_iloc_pytest.py
Normal file
76
eland/tests/dataframe/test_iloc_pytest.py
Normal file
@ -0,0 +1,76 @@
|
||||
# File called _pytest for PyCharm compatability
|
||||
import pandas as pd
|
||||
import eland as ed
|
||||
|
||||
from eland.tests.common import TestData
|
||||
from eland.tests.common import (
|
||||
assert_pandas_eland_frame_equal,
|
||||
assert_pandas_eland_series_equal
|
||||
)
|
||||
|
||||
import numpy as np
|
||||
|
||||
class TestDataFrameiLoc(TestData):
|
||||
|
||||
def test_range(self):
|
||||
columns = ['a', 'b', 'c', 'd', 'e']
|
||||
|
||||
r = pd.RangeIndex(0, 3, 1)
|
||||
|
||||
i = pd.Int64Index([1, 2])
|
||||
|
||||
dates = pd.date_range('1/1/2000', periods=8)
|
||||
|
||||
df = pd.DataFrame(np.random.randn(8, 4), index = dates, columns = ['A', 'B', 'C', 'D'])
|
||||
|
||||
print(df)
|
||||
|
||||
print("STEVE ", df.squeeze())
|
||||
|
||||
ii = slice(None)
|
||||
rr = slice(None)
|
||||
|
||||
print(df.iloc[:, 0:3])
|
||||
print(df.iloc[i, r])
|
||||
print(df.iloc[ii, rr])
|
||||
|
||||
def test_iloc1(self):
|
||||
ed_flights = self.ed_flights()
|
||||
pd_flights = self.pd_flights()
|
||||
|
||||
# https://pandas.pydata.org/pandas-docs/stable/reference/api/pandas.DataFrame.iloc.html#pandas.DataFrame.iloc
|
||||
|
||||
#pd_flights.info()
|
||||
|
||||
pd_iloc0 = pd_flights.iloc[0]
|
||||
pd_iloc1= pd_flights.iloc[[0]]
|
||||
pd_iloc2= pd_flights.iloc[[0, 1]]
|
||||
pd_iloc3 = pd_flights.iloc[:3]
|
||||
pd_iloc4 = pd_flights.iloc[[True, False, True]]
|
||||
pd_iloc5 = pd_flights.iloc[0, 1]
|
||||
pd_iloc6 = pd_flights.iloc[[0, 2], [1, 3]]
|
||||
pd_iloc7 = pd_flights.iloc[1:3, 0:3]
|
||||
pd_iloc8 = pd_flights.iloc[:, [True, False, True, False]]
|
||||
pd_iloc9 = pd_flights.iloc[[True, False, True, False]]
|
||||
|
||||
ed_iloc0 = ed_flights.iloc[0]
|
||||
ed_iloc1 = ed_flights.iloc[[0]]
|
||||
ed_iloc2 = ed_flights.iloc[[0, 1]]
|
||||
ed_iloc3 = ed_flights.iloc[:3]
|
||||
ed_iloc4 = ed_flights.iloc[[True, False, True]]
|
||||
ed_iloc5 = ed_flights.iloc[0, 1]
|
||||
ed_iloc6 = ed_flights.iloc[[0, 2], [1, 3]]
|
||||
ed_iloc7 = ed_flights.iloc[1:3, 0:3]
|
||||
ed_iloc8 = ed_flights.iloc[:, [True, False, True, False]]
|
||||
ed_iloc9 = ed_flights.iloc[[True, False, True, False]]
|
||||
|
||||
#assert_pandas_eland_frame_equal(pd_iloc0, ed_iloc0) # pd_iloc0 is Series
|
||||
assert_pandas_eland_frame_equal(pd_iloc1, ed_iloc1)
|
||||
assert_pandas_eland_frame_equal(pd_iloc2, ed_iloc2)
|
||||
assert_pandas_eland_frame_equal(pd_iloc3, ed_iloc3)
|
||||
assert_pandas_eland_frame_equal(pd_iloc4, ed_iloc4)
|
||||
#assert_pandas_eland_frame_equal(pd_iloc5, ed_iloc5) # pd_iloc5 is numpy_bool
|
||||
assert_pandas_eland_frame_equal(pd_iloc6, ed_iloc6)
|
||||
assert_pandas_eland_frame_equal(pd_iloc7, ed_iloc7)
|
||||
assert_pandas_eland_frame_equal(pd_iloc8, ed_iloc8)
|
||||
assert_pandas_eland_frame_equal(pd_iloc9, ed_iloc9)
|
15
eland/tests/dataframe/test_info_es_pytest.py
Normal file
15
eland/tests/dataframe/test_info_es_pytest.py
Normal file
@ -0,0 +1,15 @@
|
||||
# File called _pytest for PyCharm compatability
|
||||
|
||||
from eland.tests.common import TestData
|
||||
|
||||
|
||||
class TestDataFrameInfo(TestData):
|
||||
|
||||
def test_to_info1(self):
|
||||
ed_flights = self.ed_flights()
|
||||
|
||||
head = ed_flights.head(103)
|
||||
slice = head[['timestamp', 'OriginRegion', 'Carrier']]
|
||||
iloc = slice.iloc[10:92, [0,2]]
|
||||
print(iloc.info_es())
|
||||
print(iloc)
|
12
eland/tests/dataframe/test_info_pytest.py
Normal file
12
eland/tests/dataframe/test_info_pytest.py
Normal file
@ -0,0 +1,12 @@
|
||||
# File called _pytest for PyCharm compatability
|
||||
|
||||
from eland.tests.common import TestData
|
||||
|
||||
|
||||
class TestDataFrameInfo(TestData):
|
||||
|
||||
def test_to_info1(self):
|
||||
ed_flights = self.ed_flights()
|
||||
pd_flights = self.pd_flights()
|
||||
|
||||
ed_flights.info()
|
0
eland/tests/series/__init__.py
Normal file
0
eland/tests/series/__init__.py
Normal file
29
eland/tests/series/test_head_tail_pytest.py
Normal file
29
eland/tests/series/test_head_tail_pytest.py
Normal file
@ -0,0 +1,29 @@
|
||||
# File called _pytest for PyCharm compatability
|
||||
import pandas as pd
|
||||
import eland as ed
|
||||
|
||||
from eland.tests.common import TestData
|
||||
from eland.tests.common import assert_pandas_eland_series_equal
|
||||
|
||||
from eland.tests import ELASTICSEARCH_HOST
|
||||
from eland.tests import FLIGHTS_INDEX_NAME
|
||||
|
||||
from pandas.util.testing import assert_series_equal
|
||||
|
||||
|
||||
|
||||
class TestSeriesHeadTail(TestData):
|
||||
|
||||
def test_head_tail(self):
|
||||
pd_s = self.pd_flights()['Carrier']
|
||||
ed_s = ed.Series(ELASTICSEARCH_HOST, FLIGHTS_INDEX_NAME, 'Carrier')
|
||||
|
||||
pd_s_head = pd_s.head(10)
|
||||
ed_s_head = ed_s.head(10)
|
||||
|
||||
assert_pandas_eland_series_equal(pd_s_head, ed_s_head)
|
||||
|
||||
pd_s_tail = pd_s.tail(10)
|
||||
ed_s_tail = ed_s.tail(10)
|
||||
|
||||
assert_pandas_eland_series_equal(pd_s_tail, ed_s_tail)
|
24
eland/tests/series/test_repr_pytest.py
Normal file
24
eland/tests/series/test_repr_pytest.py
Normal file
@ -0,0 +1,24 @@
|
||||
# File called _pytest for PyCharm compatability
|
||||
import pandas as pd
|
||||
import eland as ed
|
||||
|
||||
from eland.tests.common import TestData
|
||||
from eland.tests.common import assert_pandas_eland_frame_equal
|
||||
|
||||
from eland.tests import ELASTICSEARCH_HOST
|
||||
from eland.tests import FLIGHTS_INDEX_NAME
|
||||
|
||||
from pandas.util.testing import assert_series_equal
|
||||
|
||||
|
||||
|
||||
class TestSeriesRepr(TestData):
|
||||
|
||||
def test_repr(self):
|
||||
pd_s = self.pd_flights()['Carrier']
|
||||
ed_s = ed.Series(ELASTICSEARCH_HOST, FLIGHTS_INDEX_NAME, 'Carrier')
|
||||
|
||||
pd_repr = repr(pd_s)
|
||||
ed_repr = repr(ed_s)
|
||||
|
||||
assert pd_repr == ed_repr
|
Loading…
x
Reference in New Issue
Block a user