diff --git a/eland/dataframe.py b/eland/dataframe.py index a7e45c4..c0995c7 100644 --- a/eland/dataframe.py +++ b/eland/dataframe.py @@ -1,6 +1,11 @@ -from eland import NDFrame +import warnings import pandas as pd +from pandas.compat import StringIO +from pandas.io.common import _expand_user, _stringify_path +from pandas.io.formats import console + +from eland import NDFrame class DataFrame(NDFrame): @@ -45,14 +50,64 @@ class DataFrame(NDFrame): return super().tail(n) def __repr__(self): - num_rows = pd.get_option("max_rows") or 60 - num_cols = pd.get_option("max_columns") or 20 + """ + From pandas + """ + buf = StringIO() - result = repr(self._build_repr_df(num_rows, num_cols)) - if len(self.index) > num_rows or len(self.columns) > num_cols: - # The split here is so that we don't repr pandas row lengths. - return result.rsplit("\n\n", 1)[0] + "\n\n[{0} rows x {1} columns]".format( - len(self.index), len(self.columns) - ) + max_rows = pd.get_option("display.max_rows") + max_cols = pd.get_option("display.max_columns") + show_dimensions = pd.get_option("display.show_dimensions") + if pd.get_option("display.expand_frame_repr"): + width, _ = console.get_console_size() else: + width = None + self.to_string(buf=buf, max_rows=max_rows, max_cols=max_cols, + line_width=width, show_dimensions=show_dimensions) + + return buf.getvalue() + + def to_string(self, buf=None, columns=None, col_space=None, header=True, + index=True, na_rep='NaN', formatters=None, float_format=None, + sparsify=None, index_names=True, justify=None, + max_rows=None, max_cols=None, show_dimensions=False, + decimal='.', line_width=None): + """ + From pandas - except we set max_rows default to avoid careless + """ + if max_rows is None: + warnings.warn("DataFrame.to_string called without max_rows set " + "- this will return entire index results. " + "Setting max_rows=60, overwrite if different behaviour is required.") + max_rows = 60 + + # Create a slightly bigger dataframe than display + df = self._build_repr_df(max_rows+1, max_cols) + + if buf is not None: + _buf = _expand_user(_stringify_path(buf)) + else: + _buf = StringIO() + + df.to_string(buf=_buf, columns=columns, + col_space=col_space, na_rep=na_rep, + formatters=formatters, + float_format=float_format, + sparsify=sparsify, justify=justify, + index_names=index_names, + header=header, index=index, + max_rows=max_rows, + max_cols=max_cols, + show_dimensions=False, # print this outside of this call + decimal=decimal, + line_width=line_width) + + # Our fake dataframe has incorrect number of rows (max_rows*2+1) - write out + # the correct number of rows + if show_dimensions: + _buf.write("\n\n[{nrows} rows x {ncols} columns]" + .format(nrows=len(self.index), ncols=len(self.columns))) + + if buf is None: + result = _buf.getvalue() return result diff --git a/eland/ndframe.py b/eland/ndframe.py index c1b7f5b..383e298 100644 --- a/eland/ndframe.py +++ b/eland/ndframe.py @@ -60,17 +60,18 @@ class NDFrame(BasePandasDataset): # Overriden version of BasePandasDataset._build_repr_df # to avoid issues with concat if len(self.index) <= num_rows: - return self.to_pandas() + return self._to_pandas() - num_rows = num_rows + 1 + num_rows = num_rows head_rows = int(num_rows / 2) + num_rows % 2 tail_rows = num_rows - head_rows - head = self.head(head_rows).to_pandas() - tail = self.tail(tail_rows).to_pandas() + head = self.head(head_rows)._to_pandas() + tail = self.tail(tail_rows)._to_pandas() return head.append(tail) - def to_pandas(self): + def _to_pandas(self): return self._query_compiler.to_pandas() + diff --git a/eland/operations.py b/eland/operations.py index 8705c4d..ec18a14 100644 --- a/eland/operations.py +++ b/eland/operations.py @@ -129,7 +129,6 @@ class Operations: @staticmethod def _apply_df_post_processing(df, post_processing): for action in post_processing: - print(action) if action == 'sort_index': df = df.sort_index() elif action[0] == 'head': diff --git a/eland/tests/__init__.py b/eland/tests/__init__.py new file mode 100644 index 0000000..d0b1ad8 --- /dev/null +++ b/eland/tests/__init__.py @@ -0,0 +1,490 @@ +import os +import pandas as pd + +ROOT_DIR = os.path.dirname(os.path.abspath(__file__)) + +# Set modin to pandas to avoid starting ray or other +os.environ["MODIN_ENGINE"] = 'python' +os.environ["MODIN_BACKEND"] = 'pandas' + +# Define test files and indices +ELASTICSEARCH_HOST = 'localhost' # TODO externalise this + +FLIGHTS_INDEX_NAME = 'flights' +FLIGHTS_MAPPING = { "mappings" : { + "properties" : { + "AvgTicketPrice" : { + "type" : "float" + }, + "Cancelled" : { + "type" : "boolean" + }, + "Carrier" : { + "type" : "keyword" + }, + "Dest" : { + "type" : "keyword" + }, + "DestAirportID" : { + "type" : "keyword" + }, + "DestCityName" : { + "type" : "keyword" + }, + "DestCountry" : { + "type" : "keyword" + }, + "DestLocation" : { + "type" : "geo_point" + }, + "DestRegion" : { + "type" : "keyword" + }, + "DestWeather" : { + "type" : "keyword" + }, + "DistanceKilometers" : { + "type" : "float" + }, + "DistanceMiles" : { + "type" : "float" + }, + "FlightDelay" : { + "type" : "boolean" + }, + "FlightDelayMin" : { + "type" : "integer" + }, + "FlightDelayType" : { + "type" : "keyword" + }, + "FlightNum" : { + "type" : "keyword" + }, + "FlightTimeHour" : { + "type" : "float" + }, + "FlightTimeMin" : { + "type" : "float" + }, + "Origin" : { + "type" : "keyword" + }, + "OriginAirportID" : { + "type" : "keyword" + }, + "OriginCityName" : { + "type" : "keyword" + }, + "OriginCountry" : { + "type" : "keyword" + }, + "OriginLocation" : { + "type" : "geo_point" + }, + "OriginRegion" : { + "type" : "keyword" + }, + "OriginWeather" : { + "type" : "keyword" + }, + "dayOfWeek" : { + "type" : "integer" + }, + "timestamp" : { + "type" : "date" + } + } + } } +FLIGHTS_FILE_NAME = ROOT_DIR + '/flights.json.gz' +FLIGHTS_DF_FILE_NAME = ROOT_DIR + '/flights_df.json.gz' + +ECOMMERCE_INDEX_NAME = 'ecommerce' +ECOMMERCE_MAPPING = { "mappings" : { + "properties" : { + "category" : { + "type" : "text", + "fields" : { + "keyword" : { + "type" : "keyword" + } + } + }, + "currency" : { + "type" : "keyword" + }, + "customer_birth_date" : { + "type" : "date" + }, + "customer_first_name" : { + "type" : "text", + "fields" : { + "keyword" : { + "type" : "keyword", + "ignore_above" : 256 + } + } + }, + "customer_full_name" : { + "type" : "text", + "fields" : { + "keyword" : { + "type" : "keyword", + "ignore_above" : 256 + } + } + }, + "customer_gender" : { + "type" : "keyword" + }, + "customer_id" : { + "type" : "keyword" + }, + "customer_last_name" : { + "type" : "text", + "fields" : { + "keyword" : { + "type" : "keyword", + "ignore_above" : 256 + } + } + }, + "customer_phone" : { + "type" : "keyword" + }, + "day_of_week" : { + "type" : "keyword" + }, + "day_of_week_i" : { + "type" : "integer" + }, + "email" : { + "type" : "keyword" + }, + "geoip" : { + "properties" : { + "city_name" : { + "type" : "keyword" + }, + "continent_name" : { + "type" : "keyword" + }, + "country_iso_code" : { + "type" : "keyword" + }, + "location" : { + "type" : "geo_point" + }, + "region_name" : { + "type" : "keyword" + } + } + }, + "manufacturer" : { + "type" : "text", + "fields" : { + "keyword" : { + "type" : "keyword" + } + } + }, + "order_date" : { + "type" : "date" + }, + "order_id" : { + "type" : "keyword" + }, + "products" : { + "properties" : { + "_id" : { + "type" : "text", + "fields" : { + "keyword" : { + "type" : "keyword", + "ignore_above" : 256 + } + } + }, + "base_price" : { + "type" : "half_float" + }, + "base_unit_price" : { + "type" : "half_float" + }, + "category" : { + "type" : "text", + "fields" : { + "keyword" : { + "type" : "keyword" + } + } + }, + "created_on" : { + "type" : "date" + }, + "discount_amount" : { + "type" : "half_float" + }, + "discount_percentage" : { + "type" : "half_float" + }, + "manufacturer" : { + "type" : "text", + "fields" : { + "keyword" : { + "type" : "keyword" + } + } + }, + "min_price" : { + "type" : "half_float" + }, + "price" : { + "type" : "half_float" + }, + "product_id" : { + "type" : "long" + }, + "product_name" : { + "type" : "text", + "fields" : { + "keyword" : { + "type" : "keyword" + } + }, + "analyzer" : "english" + }, + "quantity" : { + "type" : "integer" + }, + "sku" : { + "type" : "keyword" + }, + "tax_amount" : { + "type" : "half_float" + }, + "taxful_price" : { + "type" : "half_float" + }, + "taxless_price" : { + "type" : "half_float" + }, + "unit_discount_amount" : { + "type" : "half_float" + } + } + }, + "sku" : { + "type" : "keyword" + }, + "taxful_total_price" : { + "type" : "half_float" + }, + "taxless_total_price" : { + "type" : "half_float" + }, + "total_quantity" : { + "type" : "integer" + }, + "total_unique_products" : { + "type" : "integer" + }, + "type" : { + "type" : "keyword" + }, + "user" : { + "type" : "keyword" + } + } + } } +ECOMMERCE_FILE_NAME = ROOT_DIR + '/ecommerce.json.gz' +ECOMMERCE_DF_FILE_NAME = ROOT_DIR + '/ecommerce_df.json.gz' + +TEST_MAPPING1 = { + 'mappings': { + 'properties': { + 'city': { + 'type': 'text', + 'fields': { + 'raw': { + 'type': 'keyword' + } + } + }, + 'text': { + 'type': 'text', + 'fields': { + 'english': { + 'type': 'text', + 'analyzer': 'english' + } + } + }, + 'origin_location': { + 'properties': { + 'lat': { + 'type': 'text', + 'index_prefixes': {}, + 'fields': { + 'keyword': { + 'type': 'keyword', + 'ignore_above': 256 + } + } + }, + 'lon': { + 'type': 'text', + 'fields': { + 'keyword': { + 'type': 'keyword', + 'ignore_above': 256 + } + } + } + } + }, + 'maps-telemetry': { + 'properties': { + 'attributesPerMap': { + 'properties': { + 'dataSourcesCount': { + 'properties': { + 'avg': { + 'type': 'long' + }, + 'max': { + 'type': 'long' + }, + 'min': { + 'type': 'long' + } + } + }, + 'emsVectorLayersCount': { + 'dynamic': 'true', + 'properties': { + 'france_departments': { + 'properties': { + 'avg': { + 'type': 'float' + }, + 'max': { + 'type': 'long' + }, + 'min': { + 'type': 'long' + } + } + } + } + } + } + } + } + }, + 'type': { + 'type': 'keyword' + }, + 'name': { + 'type': 'text' + }, + 'user_name': { + 'type': 'keyword' + }, + 'email': { + 'type': 'keyword' + }, + 'content': { + 'type': 'text' + }, + 'tweeted_at': { + 'type': 'date' + }, + 'dest_location': { + 'type': 'geo_point' + }, + 'my_join_field': { + 'type': 'join', + 'relations': { + 'question': ['answer', 'comment'], + 'answer': 'vote' + } + } + } + } + } + +TEST_MAPPING1_INDEX_NAME = 'mapping1' + +TEST_MAPPING1_EXPECTED = { + 'city': 'text', + 'city.raw': 'keyword', + 'content': 'text', + 'dest_location': 'geo_point', + 'email': 'keyword', + 'maps-telemetry.attributesPerMap.dataSourcesCount.avg': 'long', + 'maps-telemetry.attributesPerMap.dataSourcesCount.max': 'long', + 'maps-telemetry.attributesPerMap.dataSourcesCount.min': 'long', + 'maps-telemetry.attributesPerMap.emsVectorLayersCount.france_departments.avg': 'float', + 'maps-telemetry.attributesPerMap.emsVectorLayersCount.france_departments.max': 'long', + 'maps-telemetry.attributesPerMap.emsVectorLayersCount.france_departments.min': 'long', + 'my_join_field': 'join', + 'name': 'text', + 'origin_location.lat': 'text', + 'origin_location.lat.keyword': 'keyword', + 'origin_location.lon': 'text', + 'origin_location.lon.keyword': 'keyword', + 'text': 'text', + 'text.english': 'text', + 'tweeted_at': 'date', + 'type': 'keyword', + 'user_name': 'keyword' +} + +TEST_MAPPING1_EXPECTED_DF = pd.DataFrame.from_dict(data=TEST_MAPPING1_EXPECTED, orient='index', columns=['es_dtype']) +TEST_MAPPING1_EXPECTED_SOURCE_FIELD_DF = TEST_MAPPING1_EXPECTED_DF.drop(index=['city.raw', + 'origin_location.lat.keyword', + 'origin_location.lon.keyword', + 'text.english']) +TEST_MAPPING1_EXPECTED_SOURCE_FIELD_COUNT = len(TEST_MAPPING1_EXPECTED_SOURCE_FIELD_DF.index) + +TEST_NESTED_USER_GROUP_INDEX_NAME = 'nested_user_group' +TEST_NESTED_USER_GROUP_MAPPING = { + 'mappings': { + 'properties': { + 'group': { + 'type': 'keyword' + }, + 'user': { + 'properties': { + 'first': { + 'type': 'keyword' + }, + 'last': { + 'type': 'keyword' + }, + 'address' : { + 'type' : 'keyword' + } + } + } + } +} +} + +TEST_NESTED_USER_GROUP_DOCS = [ +{'_index':TEST_NESTED_USER_GROUP_INDEX_NAME, +'_source': + {'group':'amsterdam','user':[ + {'first':'Manke','last':'Nelis','address':['Elandsgracht', 'Amsterdam']}, + {'first':'Johnny','last':'Jordaan','address':['Elandsstraat', 'Amsterdam']}]}}, +{'_index':TEST_NESTED_USER_GROUP_INDEX_NAME, +'_source': + {'group':'london','user':[ + {'first':'Alice','last':'Monkton'}, + {'first':'Jimmy','last':'White','address':['London']}]}}, +{'_index':TEST_NESTED_USER_GROUP_INDEX_NAME, +'_source':{'group':'new york','user':[ + {'first':'Bill','last':'Jones'}]}} +] + diff --git a/eland/tests/common.py b/eland/tests/common.py new file mode 100644 index 0000000..6243a4e --- /dev/null +++ b/eland/tests/common.py @@ -0,0 +1,60 @@ +import pytest + +import eland as ed + +import pandas as pd + +from pandas.util.testing import (assert_frame_equal) + +import os + +ROOT_DIR = os.path.dirname(os.path.abspath(__file__)) + +# Create pandas and eland data frames +from eland.tests import ELASTICSEARCH_HOST +from eland.tests import FLIGHTS_DF_FILE_NAME, FLIGHTS_INDEX_NAME,\ + ECOMMERCE_DF_FILE_NAME, ECOMMERCE_INDEX_NAME + +_pd_flights = pd.read_json(FLIGHTS_DF_FILE_NAME).sort_index() +_pd_flights['timestamp'] = \ + pd.to_datetime(_pd_flights['timestamp']) +_pd_flights.index = _pd_flights.index.map(str) # make index 'object' not int +_ed_flights = ed.read_es(ELASTICSEARCH_HOST, FLIGHTS_INDEX_NAME) + +_pd_ecommerce = pd.read_json(ECOMMERCE_DF_FILE_NAME).sort_index() +_pd_ecommerce['order_date'] = \ + pd.to_datetime(_pd_ecommerce['order_date']) +_pd_ecommerce['products.created_on'] = \ + _pd_ecommerce['products.created_on'].apply(lambda x: pd.to_datetime(x)) +_pd_ecommerce.insert(2, 'customer_birth_date', None) +_pd_ecommerce.index = _pd_ecommerce.index.map(str) # make index 'object' not int +_pd_ecommerce['customer_birth_date'].astype('datetime64') +_ed_ecommerce = ed.read_es(ELASTICSEARCH_HOST, ECOMMERCE_INDEX_NAME) + +class TestData: + + def pd_flights(self): + return _pd_flights + + def ed_flights(self): + return _ed_flights + + def pd_ecommerce(self): + return _pd_ecommerce + + def ed_ecommerce(self): + return _ed_ecommerce + +def assert_pandas_eland_frame_equal(left, right): + if not isinstance(left, pd.DataFrame): + raise AssertionError("Expected type {exp_type}, found {act_type} instead", + exp_type=type(pd.DataFrame), act_type=type(left)) + + if not isinstance(right, ed.DataFrame): + raise AssertionError("Expected type {exp_type}, found {act_type} instead", + exp_type=type(ed.DataFrame), act_type=type(right)) + + # Use pandas tests to check similarity + assert_frame_equal(left, right._to_pandas()) + + diff --git a/eland/tests/dataframe/test_head_tail_pytest.py b/eland/tests/dataframe/test_head_tail_pytest.py index 1e3b7de..7cc6012 100644 --- a/eland/tests/dataframe/test_head_tail_pytest.py +++ b/eland/tests/dataframe/test_head_tail_pytest.py @@ -1,79 +1,82 @@ # File called _pytest for PyCharm compatability import pandas as pd -import io -import eland as ed +from eland.tests.common import TestData +from eland.tests.common import assert_pandas_eland_frame_equal -from pandas.util.testing import ( - assert_series_equal, assert_frame_equal) -class TestDataFrameHeadTail(): + +class TestDataFrameHeadTail(TestData): def test_head(self): - ed_flights = ed.read_es(es_params='localhost', index_pattern='flights') + ed_flights = self.ed_flights() + pd_flights = self.pd_flights() - head_10 = ed_flights.head(10) - print(head_10._query_compiler._operations._to_es_query()) + ed_head_10 = ed_flights.head(10) + pd_head_10 = pd_flights.head(10) + assert_pandas_eland_frame_equal(pd_head_10, ed_head_10) - head_8 = head_10.head(8) - print(head_8._query_compiler._operations._to_es_query()) + ed_head_8 = ed_head_10.head(8) + pd_head_8 = pd_head_10.head(8) + assert_pandas_eland_frame_equal(pd_head_8, ed_head_8) - head_20 = head_10.head(20) - print(head_20._query_compiler._operations._to_es_query()) + ed_head_20 = ed_head_10.head(20) + pd_head_20 = pd_head_10.head(20) + assert_pandas_eland_frame_equal(pd_head_20, ed_head_20) def test_tail(self): - ed_flights = ed.read_es(es_params='localhost', index_pattern='flights') + ed_flights = self.ed_flights() + pd_flights = self.pd_flights() - tail_10 = ed_flights.tail(10) - print(tail_10._query_compiler._operations._to_es_query()) - print(tail_10) + ed_tail_10 = ed_flights.tail(10) + pd_tail_10 = pd_flights.tail(10) + assert_pandas_eland_frame_equal(pd_tail_10, ed_tail_10) - tail_8 = tail_10.tail(8) - print(tail_8._query_compiler._operations._to_es_query()) + ed_tail_8 = ed_tail_10.tail(8) + pd_tail_8 = pd_tail_10.tail(8) + assert_pandas_eland_frame_equal(pd_tail_8, ed_tail_8) - tail_20 = tail_10.tail(20) - print(tail_20._query_compiler._operations._to_es_query()) + ed_tail_20 = ed_tail_10.tail(20) + pd_tail_20 = pd_tail_10.tail(20) + assert_pandas_eland_frame_equal(pd_tail_20, ed_tail_20) def test_head_tail(self): - ed_flights = ed.read_es(es_params='localhost', index_pattern='flights') + ed_flights = self.ed_flights() + pd_flights = self.pd_flights() - head_10 = ed_flights.head(10) - print(head_10._query_compiler._operations._to_es_query()) + ed_head_10 = ed_flights.head(10) + pd_head_10 = pd_flights.head(10) + assert_pandas_eland_frame_equal(pd_head_10, ed_head_10) - tail_8 = head_10.tail(8) - print(tail_8._query_compiler._operations._to_es_query()) + ed_tail_8 = ed_head_10.tail(8) + pd_tail_8 = pd_head_10.tail(8) + assert_pandas_eland_frame_equal(pd_tail_8, ed_tail_8) - tail_5 = tail_8.tail(5) - print(tail_5._query_compiler._operations._to_es_query()) + ed_tail_5 = ed_tail_8.tail(5) + pd_tail_5 = pd_tail_8.tail(5) + assert_pandas_eland_frame_equal(pd_tail_5, ed_tail_5) - head_4 = tail_5.head(4) - print(head_4._query_compiler._operations._to_es_query()) + ed_tail_4 = ed_tail_5.tail(4) + pd_tail_4 = pd_tail_5.tail(4) + assert_pandas_eland_frame_equal(pd_tail_4, ed_tail_4) def test_tail_head(self): - ed_flights = ed.read_es(es_params='localhost', index_pattern='flights') + ed_flights = self.ed_flights() + pd_flights = self.pd_flights() - tail_10 = ed_flights.tail(10) - print(tail_10._query_compiler._operations._to_es_query()) + ed_tail_10 = ed_flights.tail(10) + pd_tail_10 = pd_flights.tail(10) + assert_pandas_eland_frame_equal(pd_tail_10, ed_tail_10) - head_8 = tail_10.head(8) - print(head_8._query_compiler._operations._to_es_query()) + ed_head_8 = ed_tail_10.head(8) + pd_head_8 = pd_tail_10.head(8) + assert_pandas_eland_frame_equal(pd_head_8, ed_head_8) - head_5 = head_8.head(5) - print(head_5._query_compiler._operations._to_es_query()) + ed_tail_5 = ed_head_8.tail(5) + pd_tail_5 = pd_head_8.tail(5) + assert_pandas_eland_frame_equal(pd_tail_5, ed_tail_5) - tail_4 = head_5.tail(4) - print(tail_4._query_compiler._operations._to_es_query()) + ed_head_4 = ed_tail_5.head(4) + pd_head_4 = pd_tail_5.head(4) + assert_pandas_eland_frame_equal(pd_head_4, ed_head_4) - def test_head_tail_print(self): - ed_flights = ed.read_es(es_params='localhost', index_pattern='flights') - - tail_100 = ed_flights.tail(100) - print(tail_100._query_compiler._operations._to_es_query()) - print(tail_100) - - head_10 = tail_100.head(10) - print(head_10) - - tail_4 = head_10.tail(4) - print(tail_4._query_compiler._operations._to_es_query()) - print(tail_4) diff --git a/eland/tests/dataframe/test_repr_pytest.py b/eland/tests/dataframe/test_repr_pytest.py new file mode 100644 index 0000000..0f1e1cc --- /dev/null +++ b/eland/tests/dataframe/test_repr_pytest.py @@ -0,0 +1,43 @@ +# File called _pytest for PyCharm compatability + +from eland.tests.common import TestData + + +class TestDataFrameHeadTail(TestData): + + def test_to_string1(self): + ed_flights = self.ed_flights() + pd_flights = self.pd_flights() + + ed_head_101 = ed_flights.head(101) + pd_head_101 = pd_flights.head(101) + + # This sets max_rows=60 by default + ed_head_101_str = ed_head_101.to_string() + pd_head_101_str = pd_head_101.to_string(max_rows=60) + + assert pd_head_101_str == ed_head_101_str + + def test_to_string2(self): + ed_flights = self.ed_flights() + pd_flights = self.pd_flights() + + ed_head_11 = ed_flights.head(11) + pd_head_11 = pd_flights.head(11) + + ed_head_11_str = ed_head_11.to_string(max_rows=60) + pd_head_11_str = pd_head_11.to_string(max_rows=60) + + assert pd_head_11_str == ed_head_11_str + + def test_to_repr(self): + ed_ecommerce = self.ed_ecommerce() + pd_ecommerce = self.pd_ecommerce() + + ed_head_18 = ed_ecommerce.head(18) + pd_head_18 = pd_ecommerce.head(18) + + ed_head_18_repr = repr(ed_head_18) + pd_head_18_repr = repr(pd_head_18) + + assert ed_head_18_repr == pd_head_18_repr diff --git a/eland/tests/ecommerce.json.gz b/eland/tests/ecommerce.json.gz new file mode 100644 index 0000000..b1a5dff Binary files /dev/null and b/eland/tests/ecommerce.json.gz differ diff --git a/eland/tests/ecommerce_df.json.gz b/eland/tests/ecommerce_df.json.gz new file mode 100644 index 0000000..11f5a98 Binary files /dev/null and b/eland/tests/ecommerce_df.json.gz differ diff --git a/eland/tests/flights.json.gz b/eland/tests/flights.json.gz new file mode 100644 index 0000000..df976e6 Binary files /dev/null and b/eland/tests/flights.json.gz differ diff --git a/eland/tests/flights_df.json.gz b/eland/tests/flights_df.json.gz new file mode 100644 index 0000000..5aed61e Binary files /dev/null and b/eland/tests/flights_df.json.gz differ diff --git a/eland/tests/setup_tests.py b/eland/tests/setup_tests.py new file mode 100644 index 0000000..b60fa1e --- /dev/null +++ b/eland/tests/setup_tests.py @@ -0,0 +1,69 @@ +import pandas as pd +from elasticsearch import Elasticsearch +from elasticsearch import helpers + +from eland.tests import * + +DATA_LIST = [ + (FLIGHTS_FILE_NAME, FLIGHTS_INDEX_NAME, FLIGHTS_MAPPING), + (ECOMMERCE_FILE_NAME, ECOMMERCE_INDEX_NAME, ECOMMERCE_MAPPING) +] + +def _setup_data(es): + # Read json file and index records into Elasticsearch + for data in DATA_LIST: + json_file_name = data[0] + index_name = data[1] + mapping = data[2] + + # Delete index + print("Deleting index:", index_name) + es.indices.delete(index=index_name, ignore=[400, 404]) + print("Creating index:", index_name) + es.indices.create(index=index_name, body=mapping) + + df = pd.read_json(json_file_name, lines=True) + + actions = [] + n = 0 + + print("Adding", df.shape[0], "items to index:", index_name) + for index, row in df.iterrows(): + values = row.to_dict() + # make timestamp datetime 2018-01-01T12:09:35 + #values['timestamp'] = datetime.strptime(values['timestamp'], '%Y-%m-%dT%H:%M:%S') + + # Use integer as id field for repeatable results + action = {'_index': index_name, '_source': values, '_id': str(n)} + + actions.append(action) + + n = n + 1 + + if n % 10000 == 0: + helpers.bulk(es, actions) + actions = [] + + helpers.bulk(es, actions) + actions = [] + + print("Done", index_name) + +def _setup_test_mappings(es): + # Create a complex mapping containing many Elasticsearch features + es.indices.delete(index=TEST_MAPPING1_INDEX_NAME, ignore=[400, 404]) + es.indices.create(index=TEST_MAPPING1_INDEX_NAME, body=TEST_MAPPING1) + +def _setup_test_nested(es): + es.indices.delete(index=TEST_NESTED_USER_GROUP_INDEX_NAME, ignore=[400, 404]) + es.indices.create(index=TEST_NESTED_USER_GROUP_INDEX_NAME, body=TEST_NESTED_USER_GROUP_MAPPING) + + helpers.bulk(es, TEST_NESTED_USER_GROUP_DOCS) + +if __name__ == '__main__': + # Create connection to Elasticsearch - use defaults + es = Elasticsearch(ELASTICSEARCH_HOST) + + _setup_data(es) + _setup_test_mappings(es) + _setup_test_nested(es)