Added __getitem__

Implementation copies DataFrame and changes underlying mappings
object.
This commit is contained in:
Stephen Dodson 2019-06-25 08:41:25 +00:00
parent d4250640f1
commit 9030f84f4c
8 changed files with 3267 additions and 47 deletions

View File

@ -7,6 +7,8 @@ class Client():
def __init__(self, es=None):
if isinstance(es, Elasticsearch):
self.es = es
elif isinstance(es, Client):
self.es = es.es
else:
self.es = Elasticsearch(es)

View File

@ -44,6 +44,9 @@ class DataFrame():
index_pattern : str
An Elasticsearch index pattern. This can contain wildcards (e.g. filebeat-*).
operations: list of operation
A list of Elasticsearch analytics operations e.g. filter, aggregations etc.
See Also
--------
@ -69,13 +72,26 @@ class DataFrame():
object is created, the object is not rebuilt and so inconsistencies can occur.
"""
def __init__(self, client, index_pattern):
def __init__(self,
client,
index_pattern,
mappings=None,
operations=None):
self.client = ed.Client(client)
self.index_pattern = index_pattern
# Get and persist mappings, this allows us to correctly
# map returned types from Elasticsearch to pandas datatypes
self.mappings = ed.Mappings(self.client, self.index_pattern)
if mappings is None:
self.mappings = ed.Mappings(self.client, self.index_pattern)
else:
self.mappings = mappings
# Initialise a list of 'operations'
# these are filters
self.operations = []
if operations is not None:
self.operations.extend(operations)
def _es_results_to_pandas(self, results):
"""
@ -174,7 +190,7 @@ class DataFrame():
is_source_field = False
pd_dtype = 'object'
else:
is_source_field, pd_dtype = self.mappings.is_source_field(name[:-1])
is_source_field, pd_dtype = self.mappings.source_field_pd_dtype(name[:-1])
if not is_source_field and type(x) is dict:
for a in x:
@ -182,7 +198,7 @@ class DataFrame():
elif not is_source_field and type(x) is list:
for a in x:
flatten(a, name)
else:
elif is_source_field == True: # only print source fields from mappings (TODO - not so efficient for large number of fields and filtered mapping)
field_name = name[:-1]
# Coerce types - for now just datetime
@ -213,6 +229,19 @@ class DataFrame():
# Create pandas DataFrame
df = pd.DataFrame(data=rows)
# _source may not contain all columns in the mapping
# therefore, fill in missing columns
# (note this returns self.columns NOT IN df.columns)
missing_columns = list(set(self.columns) - set(df.columns))
for missing in missing_columns:
is_source_field, pd_dtype = self.mappings.source_field_pd_dtype(missing)
df[missing] = None
df[missing].astype(pd_dtype)
# Sort columns in mapping order
df = df[self.columns]
return df
def head(self, n=5):
@ -266,7 +295,7 @@ class DataFrame():
1 - number of columns
"""
num_rows = len(self)
num_columns = self.columns
num_columns = len(self.columns)
return num_rows, num_columns
@ -275,15 +304,28 @@ class DataFrame():
return self.mappings.source_fields()
def __getitem__(self, item):
# df['a'] -> item == str
# df['a', 'b'] -> item == (str, str) tuple
columns = []
if isinstance(item, str):
if item not in self.mappings.is_source_field(item):
if not self.mappings.is_source_field(item):
raise TypeError('Column does not exist: [{0}]'.format(item))
return Column(item)
columns.append(item)
elif isinstance(item, tuple):
columns.extend(list(item))
if len(columns) > 0:
# Return new eland.DataFrame with modified mappings
mappings = ed.Mappings(mappings=self.mappings, columns=columns)
return DataFrame(self.client, self.index_pattern, mappings=mappings)
"""
elif isinstance(item, BooleanFilter):
self._filter = item.build()
return self
else:
raise TypeError('Unsupported expr: [{0}]'.format(item))
"""
def __len__(self):
"""
@ -295,6 +337,10 @@ class DataFrame():
# Rendering Methods
def __repr__(self):
return self.to_string()
def to_string(self):
# The return for this is display.options.max_rows
max_rows = 60
head_rows = max_rows / 2
@ -310,6 +356,8 @@ class DataFrame():
# NOTE: this sparse DataFrame can't be used as the middle
# section is all NaNs. However, it gives us potentially a nice way
# to use the pandas IO methods.
# TODO - if data is indexed by time series, return top/bottom of
# time series, rather than first max_rows items
sdf = pd.DataFrame({item: pd.SparseArray(data=head[item],
sparse_index=
BlockIndex(
@ -320,4 +368,3 @@ class DataFrame():
return sdf.to_string(max_rows=max_rows)
return head.to_string(max_rows=max_rows)

View File

@ -26,7 +26,11 @@ class Mappings():
origin_location.lat True text object True False
"""
def __init__(self, client, index_pattern):
def __init__(self,
client=None,
index_pattern=None,
mappings=None,
columns=None):
"""
Parameters
----------
@ -35,29 +39,38 @@ class Mappings():
index_pattern: str
Elasticsearch index pattern
Copy constructor arguments
mappings: Mappings
Object to copy
columns: list of str
Columns to copy
"""
# persist index_pattern for debugging
self.index_pattern = index_pattern
if (client is not None) and (index_pattern is not None):
get_mapping = client.indices().get_mapping(index=index_pattern)
mappings = client.indices().get_mapping(index=index_pattern)
# Get all fields (including all nested) and then field_caps
# for these names (fields=* doesn't appear to work effectively...)
all_fields = Mappings._extract_fields_from_mapping(get_mapping)
all_fields_caps = client.field_caps(index=index_pattern, fields=list(all_fields.keys()))
# Get all fields (including all nested) and then field_caps
# for these names (fields=* doesn't appear to work effectively...)
all_fields = Mappings._extract_fields_from_mapping(mappings)
all_fields_caps = client.field_caps(index=index_pattern, fields=list(all_fields.keys()))
# Get top level (not sub-field multifield) mappings
source_fields = Mappings._extract_fields_from_mapping(get_mapping, source_only=True)
# Get top level (not sub-field multifield) mappings
source_fields = Mappings._extract_fields_from_mapping(mappings, source_only=True)
# Populate capability matrix of fields
# field_name, es_dtype, pd_dtype, is_searchable, is_aggregtable, is_source
self.mappings_capabilities = Mappings._create_capability_matrix(all_fields, source_fields, all_fields_caps)
# Populate capability matrix of fields
# field_name, es_dtype, pd_dtype, is_searchable, is_aggregtable, is_source
self.mappings_capabilities = Mappings._create_capability_matrix(all_fields, source_fields, all_fields_caps)
else:
# Copy object and restrict mapping columns
self.mappings_capabilities = mappings.mappings_capabilities.loc[columns]
# Cache source field types for efficient lookup
# (this massively improves performance of DataFrame.flatten)
self.source_field_pd_dtypes = {}
for field_name in source_fields:
for field_name in self.source_fields():
pd_dtype = self.mappings_capabilities.loc[field_name]['pd_dtype']
self.source_field_pd_dtypes[field_name] = pd_dtype
@ -265,7 +278,7 @@ class Mappings():
def pd_dtype
"""
def is_source_field(self, field_name):
def source_field_pd_dtype(self, field_name):
"""
Parameters
----------
@ -287,6 +300,24 @@ class Mappings():
return is_source_field, pd_dtype
def is_source_field(self, field_name):
"""
Parameters
----------
field_name: str
Returns
-------
is_source_field: bool
Is this field name a top-level source field?
"""
is_source_field = False
if field_name in self.source_field_pd_dtypes:
is_source_field = True
return is_source_field
def numeric_source_fields(self):
"""
Returns

View File

@ -20,5 +20,28 @@ class TestMapping():
assert mappings.count_source_fields() == TEST_MAPPING1_EXPECTED_SOURCE_FIELD_COUNT
def test_copy(self):
mappings = ed.Mappings(ed.Client(ELASTICSEARCH_HOST), TEST_MAPPING1_INDEX_NAME)
assert mappings.all_fields() == TEST_MAPPING1_EXPECTED_DF.index.tolist()
assert_frame_equal(TEST_MAPPING1_EXPECTED_DF, pd.DataFrame(mappings.mappings_capabilities['es_dtype']))
assert mappings.count_source_fields() == TEST_MAPPING1_EXPECTED_SOURCE_FIELD_COUNT
# Pick 1 source field
columns = ['dest_location']
mappings_copy1 = ed.Mappings(mappings=mappings, columns=columns)
assert mappings_copy1.all_fields() == columns
assert mappings_copy1.count_source_fields() == len(columns)
# Pick 3 source fields (out of order)
columns = ['dest_location', 'city', 'user_name']
mappings_copy2 = ed.Mappings(mappings=mappings, columns=columns)
assert mappings_copy2.all_fields() == columns
assert mappings_copy2.count_source_fields() == len(columns)
# Check original is still ok
assert mappings.all_fields() == TEST_MAPPING1_EXPECTED_DF.index.tolist()
assert_frame_equal(TEST_MAPPING1_EXPECTED_DF, pd.DataFrame(mappings.mappings_capabilities['es_dtype']))
assert mappings.count_source_fields() == TEST_MAPPING1_EXPECTED_SOURCE_FIELD_COUNT

View File

@ -23,6 +23,8 @@ _pd_ecommerce['order_date'] = \
pd.to_datetime(_pd_ecommerce['order_date'])
_pd_ecommerce['products.created_on'] = \
_pd_ecommerce['products.created_on'].apply(lambda x: pd.to_datetime(x))
_pd_ecommerce.insert(2, 'customer_birth_date', None)
_pd_ecommerce['customer_birth_date'].astype('datetime64')
_ed_ecommerce = ed.read_es(ELASTICSEARCH_HOST, ECOMMERCE_INDEX_NAME)
class TestData:

View File

@ -59,3 +59,40 @@ class TestDataFrameIndexing(TestData):
def test_to_string(self):
print(self.ed_flights())
def test_get_item(self):
# Test 1 attribute
ed_carrier = self.ed_flights()['Carrier']
carrier_head = ed_carrier.head(5)
carrier_head_expected = pd.DataFrame(
{'Carrier':[
'Kibana Airlines',
'Logstash Airways',
'Logstash Airways',
'Kibana Airlines',
'Kibana Airlines'
]})
assert_frame_equal(carrier_head_expected, carrier_head)
#carrier_to_string = ed_carrier.to_string()
#print(carrier_to_string)
# Test multiple attributes (out of order)
ed_3_items = self.ed_flights()['Dest','Carrier','FlightDelay']
ed_3_items_head = ed_3_items.head(5)
ed_3_items_expected = pd.DataFrame(dict(
Dest={0: 'Sydney Kingsford Smith International Airport', 1: 'Venice Marco Polo Airport',
2: 'Venice Marco Polo Airport', 3: "Treviso-Sant'Angelo Airport",
4: "Xi'an Xianyang International Airport"},
Carrier={0: 'Kibana Airlines', 1: 'Logstash Airways', 2: 'Logstash Airways', 3: 'Kibana Airlines',
4: 'Kibana Airlines'},
FlightDelay={0: False, 1: False, 2: False, 3: True, 4: False}))
assert_frame_equal(ed_3_items_expected, ed_3_items_head)
#ed_3_items_to_string = ed_3_items.to_string()
#print(ed_3_items_to_string)

File diff suppressed because it is too large Load Diff

View File

@ -1,4 +1,4 @@
import eland
import eland as ed
def read_es(es_params, index_pattern):
return eland.DataFrame(es_params, index_pattern)
return ed.DataFrame(client=es_params, index_pattern=index_pattern)