Merge pull request #12 from stevedodson/master

Added DataFrame.info() + more methods
This commit is contained in:
stevedodson 2019-06-26 14:26:10 +02:00 committed by GitHub
commit c5cee6afc8
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 664 additions and 3053 deletions

View File

@ -32,6 +32,13 @@ import pandas as pd
from pandas.core.arrays.sparse import BlockIndex
from pandas.io.formats import format as fmt
from pandas.io.formats.printing import pprint_thing
from io import StringIO
import sys
class DataFrame():
"""
pandas.DataFrame like API that proxies into Elasticsearch index(es).
@ -283,6 +290,121 @@ class DataFrame():
return df
def info(self, verbose=None, buf=None, max_cols=None, memory_usage=None,
null_counts=None):
"""
Print a concise summary of a DataFrame.
This method prints information about a DataFrame including
the index dtype and column dtypes, non-null values and memory usage.
This copies a lot of code from pandas.DataFrame.info as it is difficult
to split out the appropriate code or creating a SparseDataFrame gives
incorrect results on types and counts.
"""
if buf is None: # pragma: no cover
buf = sys.stdout
fake_df = self.__fake_dataframe__()
lines = []
lines.append(str(type(self)))
lines.append(fake_df.index._summary())
if len(self.columns) == 0:
lines.append('Empty {name}'.format(name=type(self).__name__))
fmt.buffer_put_lines(buf, lines)
return
cols = self.columns
# hack
if max_cols is None:
max_cols = pd.get_option('display.max_info_columns',
len(self.columns) + 1)
max_rows = pd.get_option('display.max_info_rows', len(self) + 1)
if null_counts is None:
show_counts = ((len(self.columns) <= max_cols) and
(len(self) < max_rows))
else:
show_counts = null_counts
exceeds_info_cols = len(self.columns) > max_cols
def _verbose_repr():
lines.append('Data columns (total %d columns):' %
len(self.columns))
space = max(len(pprint_thing(k)) for k in self.columns) + 4
counts = None
tmpl = "{count}{dtype}"
if show_counts:
counts = self.count()
if len(cols) != len(counts): # pragma: no cover
raise AssertionError(
'Columns must equal counts '
'({cols:d} != {counts:d})'.format(
cols=len(cols), counts=len(counts)))
tmpl = "{count} non-null {dtype}"
dtypes = self.dtypes
for i, col in enumerate(self.columns):
dtype = dtypes.iloc[i]
col = pprint_thing(col)
count = ""
if show_counts:
count = counts.iloc[i]
lines.append(_put_str(col, space) + tmpl.format(count=count,
dtype=dtype))
def _non_verbose_repr():
lines.append(self.columns._summary(name='Columns'))
def _sizeof_fmt(num, size_qualifier):
# returns size in human readable format
for x in ['bytes', 'KB', 'MB', 'GB', 'TB']:
if num < 1024.0:
return ("{num:3.1f}{size_q} "
"{x}".format(num=num, size_q=size_qualifier, x=x))
num /= 1024.0
return "{num:3.1f}{size_q} {pb}".format(num=num,
size_q=size_qualifier,
pb='PB')
if verbose:
_verbose_repr()
elif verbose is False: # specifically set to False, not nesc None
_non_verbose_repr()
else:
if exceeds_info_cols:
_non_verbose_repr()
else:
_verbose_repr()
counts = self.get_dtype_counts()
dtypes = ['{k}({kk:d})'.format(k=k[0], kk=k[1]) for k
in sorted(counts.items())]
lines.append('dtypes: {types}'.format(types=', '.join(dtypes)))
if memory_usage is None:
memory_usage = pd.get_option('display.memory_usage')
if memory_usage:
# append memory usage of df to display
size_qualifier = ''
# TODO - this is different from pd.DataFrame as we shouldn't
# really hold much in memory. For now just approximate with getsizeof + ignore deep
mem_usage = sys.getsizeof(self)
lines.append("memory usage: {mem}\n".format(
mem=_sizeof_fmt(mem_usage, size_qualifier)))
fmt.buffer_put_lines(buf, lines)
@property
def shape(self):
"""
@ -301,7 +423,38 @@ class DataFrame():
@property
def columns(self):
return self.mappings.source_fields()
return pd.Index(self.mappings.source_fields())
@property
def dtypes(self):
return self.mappings.dtypes()
def get_dtype_counts(self):
return self.mappings.get_dtype_counts()
def count(self):
"""
Count non-NA cells for each column (TODO row)
Counts are based on exists queries against ES
This is inefficient, as it creates N queries (N is number of fields).
An alternative approach is to use value_count aggregations. However, they have issues in that:
1. They can only be used with aggregatable fields (e.g. keyword not text)
2. For list fields they return multiple counts. E.g. tags=['elastic', 'ml'] returns value_count=2
for a single document.
"""
counts = {}
for field in self.mappings.source_fields():
exists_query = {"query":{"exists":{"field":field}}}
field_exists_count = self.client.count(index=self.index_pattern, body=exists_query)
counts[field] = field_exists_count
count = pd.Series(data=counts, index=self.mappings.source_fields())
return count
def __getitem__(self, item):
# df['a'] -> item == str
@ -313,6 +466,8 @@ class DataFrame():
columns.append(item)
elif isinstance(item, tuple):
columns.extend(list(item))
elif isinstance(item, list):
columns.extend(item)
if len(columns) > 0:
# Return new eland.DataFrame with modified mappings
@ -337,13 +492,14 @@ class DataFrame():
# Rendering Methods
def __repr__(self):
"""
Return a string representation for a particular DataFrame.
"""
return self.to_string()
def to_string(self):
# The return for this is display.options.max_rows
max_rows = 60
head_rows = max_rows / 2
def __fake_dataframe__(self, max_rows=1):
head_rows = max_rows / 2 + 1
tail_rows = max_rows - head_rows
head = self.head(max_rows)
@ -358,13 +514,33 @@ class DataFrame():
# to use the pandas IO methods.
# TODO - if data is indexed by time series, return top/bottom of
# time series, rather than first max_rows items
if tail_rows > 0:
locations = [0, num_rows-tail_rows]
lengths = [head_rows, tail_rows]
else:
locations = [0]
lengths = [head_rows]
sdf = pd.DataFrame({item: pd.SparseArray(data=head[item],
sparse_index=
BlockIndex(
num_rows, [0, num_rows-tail_rows], [head_rows, tail_rows]))
num_rows, locations, lengths))
for item in self.columns})
# TODO - don't hard code max_rows - use pandas default/ES default
return sdf.to_string(max_rows=max_rows)
return sdf
return head.to_string(max_rows=max_rows)
return head
def to_string(self):
# TODO - this doesn't return 'notebook' friendly results yet..
# TODO - don't hard code max_rows - use pandas default/ES default
max_rows = 60
df = self.__fake_dataframe__(max_rows=max_rows)
return df.to_string(max_rows=max_rows, show_dimensions=True)
def _put_str(s, space):
return '{s}'.format(s=s)[:space].ljust(space)

View File

@ -1,6 +1,8 @@
import warnings
import pandas as pd
class Mappings():
"""
General purpose to manage Elasticsearch to/from pandas mappings
@ -26,6 +28,7 @@ class Mappings():
origin_location.lat True text object True False
"""
def __init__(self,
client=None,
index_pattern=None,
@ -63,14 +66,14 @@ class Mappings():
# field_name, es_dtype, pd_dtype, is_searchable, is_aggregtable, is_source
self.mappings_capabilities = Mappings._create_capability_matrix(all_fields, source_fields, all_fields_caps)
else:
# Copy object and restrict mapping columns
# Reference object and restrict mapping columns
self.mappings_capabilities = mappings.mappings_capabilities.loc[columns]
# Cache source field types for efficient lookup
# (this massively improves performance of DataFrame.flatten)
self.source_field_pd_dtypes = {}
for field_name in self.source_fields():
for field_name in self.mappings_capabilities[self.mappings_capabilities._source == True].index:
pd_dtype = self.mappings_capabilities.loc[field_name]['pd_dtype']
self.source_field_pd_dtypes[field_name] = pd_dtype
@ -336,7 +339,7 @@ class Mappings():
source_fields: list of str
List of source fields
"""
return self.mappings_capabilities[self.mappings_capabilities._source == True].index.tolist()
return self.source_field_pd_dtypes.keys()
def count_source_fields(self):
"""
@ -345,5 +348,25 @@ class Mappings():
count_source_fields: int
Number of source fields in mapping
"""
return len(self.mappings_capabilities[self.mappings_capabilities._source == True].index)
return len(self.source_fields())
def dtypes(self):
"""
Returns
-------
dtypes: pd.Series
Source field name + pd_dtype
"""
return pd.Series(self.source_field_pd_dtypes)
def get_dtype_counts(self):
"""
Return counts of unique dtypes in this object.
Returns
-------
get_dtype_counts : Series
Series with the count of columns with each dtype.
"""
return pd.Series(self.mappings_capabilities[self.mappings_capabilities._source == True].groupby('pd_dtype')[
'_source'].count().to_dict())

View File

@ -438,7 +438,11 @@ TEST_MAPPING1_EXPECTED = {
}
TEST_MAPPING1_EXPECTED_DF = pd.DataFrame.from_dict(data=TEST_MAPPING1_EXPECTED, orient='index', columns=['es_dtype'])
TEST_MAPPING1_EXPECTED_SOURCE_FIELD_COUNT = len(TEST_MAPPING1_EXPECTED_DF.index) - 4
TEST_MAPPING1_EXPECTED_SOURCE_FIELD_DF = TEST_MAPPING1_EXPECTED_DF.drop(index=['city.raw',
'origin_location.lat.keyword',
'origin_location.lon.keyword',
'text.english'])
TEST_MAPPING1_EXPECTED_SOURCE_FIELD_COUNT = len(TEST_MAPPING1_EXPECTED_SOURCE_FIELD_DF.index)
TEST_NESTED_USER_GROUP_INDEX_NAME = 'nested_user_group'
TEST_NESTED_USER_GROUP_MAPPING = {

View File

@ -1,47 +1,71 @@
# File called _pytest for PyCharm compatability
import pytest
from eland.tests import *
from pandas.util.testing import (
assert_almost_equal, assert_frame_equal, assert_series_equal)
assert_series_equal, assert_frame_equal)
import eland as ed
from eland.tests import *
from eland.tests.common import TestData
class TestMapping():
class TestMapping(TestData):
# Requires 'setup_tests.py' to be run prior to this
def test_mapping(self):
def test_fields(self):
mappings = ed.Mappings(ed.Client(ELASTICSEARCH_HOST), TEST_MAPPING1_INDEX_NAME)
assert mappings.all_fields() == TEST_MAPPING1_EXPECTED_DF.index.tolist()
assert TEST_MAPPING1_EXPECTED_DF.index.tolist() == mappings.all_fields()
assert_frame_equal(TEST_MAPPING1_EXPECTED_DF, pd.DataFrame(mappings.mappings_capabilities['es_dtype']))
assert mappings.count_source_fields() == TEST_MAPPING1_EXPECTED_SOURCE_FIELD_COUNT
assert TEST_MAPPING1_EXPECTED_SOURCE_FIELD_COUNT == mappings.count_source_fields()
def test_copy(self):
mappings = ed.Mappings(ed.Client(ELASTICSEARCH_HOST), TEST_MAPPING1_INDEX_NAME)
assert mappings.all_fields() == TEST_MAPPING1_EXPECTED_DF.index.tolist()
assert TEST_MAPPING1_EXPECTED_DF.index.tolist() == mappings.all_fields()
assert_frame_equal(TEST_MAPPING1_EXPECTED_DF, pd.DataFrame(mappings.mappings_capabilities['es_dtype']))
assert mappings.count_source_fields() == TEST_MAPPING1_EXPECTED_SOURCE_FIELD_COUNT
assert TEST_MAPPING1_EXPECTED_SOURCE_FIELD_COUNT == mappings.count_source_fields()
# Pick 1 source field
columns = ['dest_location']
mappings_copy1 = ed.Mappings(mappings=mappings, columns=columns)
assert mappings_copy1.all_fields() == columns
assert mappings_copy1.count_source_fields() == len(columns)
assert columns == mappings_copy1.all_fields()
assert len(columns) == mappings_copy1.count_source_fields()
# Pick 3 source fields (out of order)
columns = ['dest_location', 'city', 'user_name']
mappings_copy2 = ed.Mappings(mappings=mappings, columns=columns)
assert mappings_copy2.all_fields() == columns
assert mappings_copy2.count_source_fields() == len(columns)
assert columns == mappings_copy2.all_fields()
assert len(columns) == mappings_copy2.count_source_fields()
# Check original is still ok
assert mappings.all_fields() == TEST_MAPPING1_EXPECTED_DF.index.tolist()
assert TEST_MAPPING1_EXPECTED_DF.index.tolist() == mappings.all_fields()
assert_frame_equal(TEST_MAPPING1_EXPECTED_DF, pd.DataFrame(mappings.mappings_capabilities['es_dtype']))
assert mappings.count_source_fields() == TEST_MAPPING1_EXPECTED_SOURCE_FIELD_COUNT
assert TEST_MAPPING1_EXPECTED_SOURCE_FIELD_COUNT == mappings.count_source_fields()
def test_dtypes(self):
mappings = ed.Mappings(ed.Client(ELASTICSEARCH_HOST), TEST_MAPPING1_INDEX_NAME)
expected_dtypes = pd.Series(
{'city': 'object', 'content': 'object', 'dest_location': 'object', 'email': 'object',
'maps-telemetry.attributesPerMap.dataSourcesCount.avg': 'int64',
'maps-telemetry.attributesPerMap.dataSourcesCount.max': 'int64',
'maps-telemetry.attributesPerMap.dataSourcesCount.min': 'int64',
'maps-telemetry.attributesPerMap.emsVectorLayersCount.france_departments.avg': 'float64',
'maps-telemetry.attributesPerMap.emsVectorLayersCount.france_departments.max': 'int64',
'maps-telemetry.attributesPerMap.emsVectorLayersCount.france_departments.min': 'int64',
'my_join_field': 'object', 'name': 'object', 'origin_location.lat': 'object',
'origin_location.lon': 'object', 'text': 'object', 'tweeted_at': 'datetime64[ns]',
'type': 'object', 'user_name': 'object'})
assert_series_equal(expected_dtypes, mappings.dtypes())
def test_get_dtype_counts(self):
mappings = ed.Mappings(ed.Client(ELASTICSEARCH_HOST), TEST_MAPPING1_INDEX_NAME)
expected_get_dtype_counts = pd.Series({'datetime64[ns]': 1, 'float64': 1, 'int64': 5, 'object': 11})
assert_series_equal(expected_get_dtype_counts, mappings.get_dtype_counts())

View File

@ -1,12 +1,11 @@
# File called _pytest for PyCharm compatability
from eland.tests.frame.common import TestData
from eland.tests import *
from eland.tests.common import TestData
import eland as ed
import pandas as pd
import io
from pandas.util.testing import (
assert_almost_equal, assert_frame_equal, assert_series_equal)
assert_series_equal, assert_frame_equal)
class TestDataFrameIndexing(TestData):
@ -59,7 +58,7 @@ class TestDataFrameIndexing(TestData):
def test_to_string(self):
print(self.ed_flights())
def test_get_item(self):
def test_getitem(self):
# Test 1 attribute
ed_carrier = self.ed_flights()['Carrier']
@ -96,3 +95,72 @@ class TestDataFrameIndexing(TestData):
#ed_3_items_to_string = ed_3_items.to_string()
#print(ed_3_items_to_string)
# Test numerics
numerics = ['DistanceMiles', 'AvgTicketPrice', 'FlightTimeMin']
ed_numerics = self.ed_flights()[numerics]
# just test headers
ed_numerics_describe = ed_numerics.describe()
assert ed_numerics_describe.columns.tolist() == numerics
def test_info(self):
ed_flights_info_buf = io.StringIO()
pd_flights_info_buf = io.StringIO()
self.ed_flights().info(buf=ed_flights_info_buf)
self.pd_flights().info(buf=pd_flights_info_buf)
ed_flights_info = (ed_flights_info_buf.getvalue().splitlines())
pd_flights_info = (pd_flights_info_buf.getvalue().splitlines())
flights_diff = set(ed_flights_info).symmetric_difference(set(pd_flights_info))
ed_ecommerce_info_buf = io.StringIO()
pd_ecommerce_info_buf = io.StringIO()
self.ed_ecommerce().info(buf=ed_ecommerce_info_buf)
self.pd_ecommerce().info(buf=pd_ecommerce_info_buf)
ed_ecommerce_info = (ed_ecommerce_info_buf.getvalue().splitlines())
pd_ecommerce_info = (pd_ecommerce_info_buf.getvalue().splitlines())
# We don't compare ecommerce here as the default dtypes in pandas from read_json
# don't match the mapping types. This is mainly because the products field is
# nested and so can be treated as a multi-field in ES, but not in pandas
ecommerce_diff = set(ed_ecommerce_info).symmetric_difference(set(pd_ecommerce_info))
def test_count(self):
pd_flights_count = self.pd_flights().count()
ed_flights_count = self.ed_flights().count()
assert_series_equal(pd_flights_count, ed_flights_count)
pd_ecommerce_count = self.pd_ecommerce().count()
ed_ecommerce_count = self.ed_ecommerce().count()
assert_series_equal(pd_ecommerce_count, ed_ecommerce_count)
def test_get_dtype_counts(self):
pd_flights_get_dtype_counts = self.pd_flights().get_dtype_counts().sort_index()
ed_flights_get_dtype_counts = self.ed_flights().get_dtype_counts().sort_index()
assert_series_equal(pd_flights_get_dtype_counts, ed_flights_get_dtype_counts)
def test_properties(self):
pd_flights_shape = self.pd_flights().shape
ed_flights_shape = self.ed_flights().shape
assert pd_flights_shape == ed_flights_shape
pd_flights_columns = self.pd_flights().columns
ed_flights_columns = self.ed_flights().columns
assert pd_flights_columns.tolist() == ed_flights_columns.tolist()
pd_flights_dtypes = self.pd_flights().dtypes
ed_flights_dtypes = self.ed_flights().dtypes
assert_series_equal(pd_flights_dtypes, ed_flights_dtypes)

File diff suppressed because it is too large Load Diff