diff --git a/eland/__init__.py b/eland/__init__.py index f82d933..eee58ae 100644 --- a/eland/__init__.py +++ b/eland/__init__.py @@ -1,7 +1,7 @@ -from .utils import * from .client import * from .ndframe import * from .index import * from .mappings import * -from .dataframe import * from .series import * +from .dataframe import * +from .utils import * diff --git a/eland/client.py b/eland/client.py index aa1a6a1..3b1231f 100644 --- a/eland/client.py +++ b/eland/client.py @@ -1,4 +1,5 @@ from elasticsearch import Elasticsearch +from elasticsearch import helpers class Client(): """ @@ -17,7 +18,13 @@ class Client(): def indices(self): return self.es.indices - + + def bulk(self, actions, refresh=False): + return helpers.bulk(self.es, actions, refresh=refresh) + + def scan(self, **kwargs): + return helpers.scan(self.es, **kwargs) + def search(self, **kwargs): return self.es.search(**kwargs) diff --git a/eland/dataframe.py b/eland/dataframe.py index 35bfd4d..b4ed1c4 100644 --- a/eland/dataframe.py +++ b/eland/dataframe.py @@ -32,9 +32,14 @@ from pandas.io.formats.printing import pprint_thing from pandas.compat import StringIO from pandas.io.common import _expand_user, _stringify_path from pandas.io.formats import console +from pandas.core import common as com from eland import NDFrame from eland import Index +from eland import Series + + + class DataFrame(NDFrame): @@ -217,10 +222,6 @@ class DataFrame(NDFrame): return num_rows, num_columns - @property - def columns(self): - return super()._columns - def set_index(self, index_field): copy = self.copy() copy._index = Index(index_field) @@ -265,7 +266,6 @@ class DataFrame(NDFrame): def __getitem__(self, key): # NOTE: there is a difference between pandas here. # e.g. df['a'] returns pd.Series, df[['a','b']] return pd.DataFrame - # we always return DataFrame - TODO maybe create eland.Series at some point... # Implementation mainly copied from pandas v0.24.2 # (https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html) @@ -291,10 +291,12 @@ class DataFrame(NDFrame): # We are left with two options: a single key, and a collection of keys, columns = [] + is_single_key = False if isinstance(key, str): if not self._mappings.is_source_field(key): raise TypeError('Column does not exist: [{0}]'.format(key)) columns.append(key) + is_single_key = True elif isinstance(key, list): columns.extend(key) else: @@ -303,7 +305,18 @@ class DataFrame(NDFrame): mappings = self._filter_mappings(columns) # Return new eland.DataFrame with modified mappings - return DataFrame(self._client, self._index_pattern, mappings=mappings) + if is_single_key: + return Series(self._client, self._index_pattern, mappings=mappings) + else: + return DataFrame(self._client, self._index_pattern, mappings=mappings) + + + def __getattr__(self, name): + # Note: obj.x will always call obj.__getattribute__('x') prior to + # calling obj.__getattr__('x'). + mappings = self._filter_mappings([name]) + + return Series(self._client, self._index_pattern, mappings=mappings) def copy(self): # TODO - test and validate...may need deep copying @@ -373,7 +386,8 @@ class DataFrame(NDFrame): result = _buf.getvalue() return result - + def to_pandas(selfs): + return super()._to_pandas() # From pandas.DataFrame def _put_str(s, space): diff --git a/eland/mappings.py b/eland/mappings.py index b9e31e7..1dc0bee 100644 --- a/eland/mappings.py +++ b/eland/mappings.py @@ -2,6 +2,7 @@ import warnings import pandas as pd +from pandas.core.dtypes.common import (is_float_dtype, is_bool_dtype, is_integer_dtype, is_datetime_or_timedelta_dtype, is_string_dtype) class Mappings(): """ @@ -217,6 +218,7 @@ class Mappings(): return capability_matrix_df.sort_index() + @staticmethod def _es_dtype_to_pd_dtype(es_dtype): """ Mapping Elasticsearch types to pandas dtypes @@ -259,6 +261,84 @@ class Mappings(): # Return 'object' for all unsupported TODO - investigate how different types could be supported return 'object' + @staticmethod + def _pd_dtype_to_es_dtype(pd_dtype): + """ + Mapping pandas dtypes to Elasticsearch dtype + -------------------------------------------- + + ``` + Pandas dtype Python type NumPy type Usage + object str string_, unicode_ Text + int64 int int_, int8, int16, int32, int64, uint8, uint16, uint32, uint64 Integer numbers + float64 float float_, float16, float32, float64 Floating point numbers + bool bool bool_ True/False values + datetime64 NA datetime64[ns] Date and time values + timedelta[ns] NA NA Differences between two datetimes + category NA NA Finite list of text values + ``` + """ + es_dtype = None + + # Map all to 64-bit - TODO map to specifics: int32 -> int etc. + if is_float_dtype(pd_dtype): + es_dtype = 'double' + elif is_integer_dtype(pd_dtype): + es_dtype = 'long' + elif is_bool_dtype(pd_dtype): + es_dtype = 'boolean' + elif is_string_dtype(pd_dtype): + es_dtype = 'keyword' + elif is_datetime_or_timedelta_dtype(pd_dtype): + es_dtype = 'date' + else: + warnings.warn('No mapping for pd_dtype: [{0}], using default mapping'.format(pd_dtype)) + + return es_dtype + + @staticmethod + def _generate_es_mappings(dataframe): + """Given a pandas dataframe, generate the associated Elasticsearch mapping + + Parameters + ---------- + dataframe : pandas.DataFrame + pandas.DataFrame to create schema from + + Returns + ------- + mapping : str + """ + + """ + "mappings" : { + "properties" : { + "AvgTicketPrice" : { + "type" : "float" + }, + "Cancelled" : { + "type" : "boolean" + }, + "Carrier" : { + "type" : "keyword" + }, + "Dest" : { + "type" : "keyword" + } + } + } + """ + + mappings = {} + mappings['properties'] = {} + for column_name, dtype in dataframe.dtypes.iteritems(): + es_dtype = Mappings._pd_dtype_to_es_dtype(dtype) + + mappings['properties'][column_name] = {} + mappings['properties'][column_name]['type'] = es_dtype + + return {"mappings": mappings} + def all_fields(self): """ Returns @@ -379,3 +459,14 @@ class Mappings(): """ return pd.Series(self._mappings_capabilities[self._mappings_capabilities._source == True].groupby('pd_dtype')[ '_source'].count().to_dict()) + + def to_pandas(self): + """ + + Returns + ------- + df : pd.DataFrame + pandas DaraFrame representing this index + """ + + diff --git a/eland/ndframe.py b/eland/ndframe.py index 6f52e1c..9fbd312 100644 --- a/eland/ndframe.py +++ b/eland/ndframe.py @@ -23,10 +23,14 @@ only Elasticsearch aggregatable fields can be aggregated or grouped. """ import pandas as pd +import functools from elasticsearch_dsl import Search import eland as ed +from pandas.core.generic import NDFrame as pd_NDFrame +from pandas._libs import Timestamp, iNaT, properties + class NDFrame(): """ @@ -44,7 +48,6 @@ class NDFrame(): -------- """ - def __init__(self, client, index_pattern, @@ -191,7 +194,12 @@ class NDFrame(): rows = [] index = [] - for hit in results['hits']['hits']: + if isinstance(results, dict): + iterator = results['hits']['hits'] + else: + iterator = results + + for hit in iterator: row = hit['_source'] # get index value - can be _id or can be field value in source @@ -255,6 +263,23 @@ class NDFrame(): # reverse order (index ascending) return df.sort_index() + def _to_pandas(self): + """ + Protected method that returns all data as pandas.DataFrame. + + Returns + ------- + df + pandas.DataFrame of all values + """ + sort_params = self._index.sort_field + ":asc" + + results = self._client.scan(index=self._index_pattern) + + # We sort here rather than in scan - once everything is in core this + # should be faster + return self._es_results_to_pandas(results) + def _describe(self): numeric_source_fields = self._mappings.numeric_source_fields() @@ -294,6 +319,10 @@ class NDFrame(): return mappings + @property + def columns(self): + return self._columns + @property def index(self): return self._index @@ -309,7 +338,6 @@ class NDFrame(): def get_dtype_counts(self): return self._mappings.get_dtype_counts() - def _index_count(self): """ Returns diff --git a/eland/series.py b/eland/series.py index 735da96..47473ed 100644 --- a/eland/series.py +++ b/eland/series.py @@ -72,14 +72,17 @@ class Series(NDFrame): def __init__(self, client, index_pattern, - field_name, + field_name=None, mappings=None, index_field=None): # python 3 syntax super().__init__(client, index_pattern, mappings=mappings, index_field=index_field) # now select column (field_name) - self._mappings = self._filter_mappings([field_name]) + if field_name is not None: + self._mappings = self._filter_mappings([field_name]) + elif len(self._mappings.source_fields()) != 1: + raise TypeError('Series must have 1 field: [{0}]'.format(len(self._mappings.source_fields()))) def head(self, n=5): return self._df_to_series(super()._head(n)) @@ -199,6 +202,10 @@ class Series(NDFrame): fmt.buffer_put_lines(buf, lines) + @property + def name(self): + return list(self._mappings.source_fields())[0] + @property def shape(self): """ @@ -257,7 +264,7 @@ class Series(NDFrame): return super()._describe() def _df_to_series(self, df): - return df.iloc[:, 0] + return df[self.name] # ---------------------------------------------------------------------- # Rendering Methods @@ -269,8 +276,8 @@ class Series(NDFrame): max_rows = pd.get_option("display.max_rows") - self.to_string(buf=buf, na_rep='NaN', float_format=None, header=True, index=True, length=False, - dtype=False, name=False, max_rows=max_rows) + self.to_string(buf=buf, na_rep='NaN', float_format=None, header=True, index=True, length=True, + dtype=True, name=True, max_rows=max_rows) return buf.getvalue() @@ -279,7 +286,7 @@ class Series(NDFrame): index=True, length=True, dtype=True, name=True, max_rows=None): """ - From pandas + From pandas 0.24.2 Render a string representation of the Series. @@ -343,7 +350,6 @@ class Series(NDFrame): """ A hacked overridden version of pandas.io.formats.SeriesFormatter that writes correct length """ - def __init__(self, series, series_length, buf=None, length=True, header=True, index=True, na_rep='NaN', name=False, float_format=None, dtype=True, max_rows=None): diff --git a/eland/tests/client/test_mappings_pytest.py b/eland/tests/client/test_mappings_pytest.py index 5e50f17..c19d43f 100644 --- a/eland/tests/client/test_mappings_pytest.py +++ b/eland/tests/client/test_mappings_pytest.py @@ -1,5 +1,6 @@ # File called _pytest for PyCharm compatability +import numpy as np from pandas.util.testing import ( assert_series_equal, assert_frame_equal) @@ -88,3 +89,36 @@ class TestMapping(TestData): assert 'object' == field_capabilities['pd_dtype'] assert True == field_capabilities['searchable'] assert True == field_capabilities['aggregatable'] + + def test_generate_es_mappings(self): + df = pd.DataFrame(data={'A': np.random.rand(3), + 'B': 1, + 'C': 'foo', + 'D': pd.Timestamp('20190102'), + 'E': [1.0, 2.0, 3.0], + 'F': False, + 'G': [1, 2, 3]}, + index=['0','1','2']) + + expected_mappings = {'mappings': { + 'properties': {'A': {'type': 'double'}, + 'B': {'type': 'long'}, + 'C': {'type': 'keyword'}, + 'D': {'type': 'date'}, + 'E': {'type': 'double'}, + 'F': {'type': 'boolean'}, + 'G': {'type': 'long'}}}} + + mappings = ed.Mappings._generate_es_mappings(df) + + assert expected_mappings == mappings + + # Now create index + index_name = 'eland_test_generate_es_mappings' + + ed.pandas_to_es(df, ELASTICSEARCH_HOST, index_name, if_exists="replace", refresh=True) + + ed_df = ed.DataFrame(ELASTICSEARCH_HOST, index_name) + ed_df_head = ed_df.head() + + assert_frame_equal(df, ed_df_head) diff --git a/eland/tests/dataframe/test_basics_pytest.py b/eland/tests/dataframe/test_basics_pytest.py index 73e5fb1..dce92e9 100644 --- a/eland/tests/dataframe/test_basics_pytest.py +++ b/eland/tests/dataframe/test_basics_pytest.py @@ -153,3 +153,7 @@ class TestDataFrameBasics(TestData): ed_flights_timestamp.info() ed_flights.info() + def test_to_pandas(self): + ed_ecommerce_pd_df = self.ed_ecommerce().to_pandas() + + assert_frame_equal(self.pd_ecommerce(), ed_ecommerce_pd_df) diff --git a/eland/tests/dataframe/test_getitem_pytest.py b/eland/tests/dataframe/test_getitem_pytest.py index 0b6a5bf..e9b58ba 100644 --- a/eland/tests/dataframe/test_getitem_pytest.py +++ b/eland/tests/dataframe/test_getitem_pytest.py @@ -15,7 +15,7 @@ class TestDataFrameGetItem(TestData): ed_carrier = self.ed_flights()['Carrier'] # pandas returns a Series here - assert_frame_equal(pd.DataFrame(pd_carrier.head(100)), ed_carrier.head(100)) + assert_series_equal(pd_carrier.head(100), ed_carrier.head(100)) pd_3_items = self.pd_flights()[['Dest','Carrier','FlightDelay']] ed_3_items = self.ed_flights()[['Dest','Carrier','FlightDelay']] @@ -36,28 +36,12 @@ class TestDataFrameGetItem(TestData): def test_getattr_basic(self): # Test 1 attribute pd_carrier = self.pd_flights().Carrier - #ed_carrier = self.ed_flights().Carrier + ed_carrier = self.ed_flights().Carrier - print(type(pd_carrier)) - print(pd_carrier) + assert_series_equal(pd_carrier.head(100), ed_carrier.head(100)) - def test_boolean(self): - # Test 1 attribute - pd_carrier = self.pd_flights()['Carrier == "Kibana Airlines"'] - #ed_carrier = self.ed_flights().Carrier + pd_avgticketprice = self.pd_flights().AvgTicketPrice + ed_avgticketprice = self.ed_flights().AvgTicketPrice - print(type(pd_carrier)) - print(pd_carrier) - - - def test_loc(self): - pd = self.pd_flights().loc[10:15, ['Dest', 'Carrier']] - - print(type(pd)) - print(pd) - - pd = self.pd_flights().loc[10] - - print(type(pd)) - print(pd) + assert_series_equal(pd_avgticketprice.head(100), ed_avgticketprice.head(100)) diff --git a/eland/utils.py b/eland/utils.py index 99f597c..074785b 100644 --- a/eland/utils.py +++ b/eland/utils.py @@ -1,4 +1,73 @@ -import eland as ed +from eland import Client +from eland import DataFrame +from eland import Mappings def read_es(es_params, index_pattern): - return ed.DataFrame(client=es_params, index_pattern=index_pattern) + return DataFrame(client=es_params, index_pattern=index_pattern) + +def pandas_to_es(df, es_params, destination_index, if_exists='fail', chunk_size=10000, refresh=False): + """ + Append a pandas DataFrame to an Elasticsearch index. + Mainly used in testing. + + Parameters + ---------- + es_params : Elasticsearch client argument + elasticsearch-py parameters or + elasticsearch-py instance or + eland.Client instance + + destination_index : str + Name of Elasticsearch index to be written + + if_exists : str, default 'fail' + Behavior when the destination index exists. Value can be one of: + ``'fail'`` + If table exists, do nothing. + ``'replace'`` + If table exists, drop it, recreate it, and insert data. + ``'append'`` + If table exists, insert data. Create if does not exist. + """ + client = Client(es_params) + + mapping = Mappings._generate_es_mappings(df) + + # If table exists, check if_exists parameter + if client.indices().exists(destination_index): + if if_exists == "fail": + raise ValueError( + "Could not create the index [{0}] because it " + "already exists. " + "Change the if_exists parameter to " + "'append' or 'replace' data.".format(destination_index) + ) + elif if_exists == "replace": + client.indices().delete(destination_index) + client.indices().create(destination_index, mapping) + #elif if_exists == "append": + # TODO validate mapping is compatible + else: + client.indices().create(destination_index, mapping) + + # Now add data + actions = [] + n = 0 + for row in df.iterrows(): + # Use index as _id + id = row[0] + values = row[1].to_dict() + + # Use integer as id field for repeatable results + action = {'_index': destination_index, '_source': values, '_id': str(id)} + + actions.append(action) + + n = n + 1 + + if n % chunk_size == 0: + client.bulk(actions, refresh=refresh) + actions = [] + + client.bulk(actions, refresh=refresh) +