diff --git a/.gitignore b/.gitignore index df00727..5091e95 100644 --- a/.gitignore +++ b/.gitignore @@ -11,4 +11,7 @@ .idea/ # pytest files -.pytest_cache/1 +.pytest_cache/ + +# Ignore MacOSX files +.DS_Store diff --git a/eland/DataFrame.py b/eland/DataFrame.py index 5251c51..e840c63 100644 --- a/eland/DataFrame.py +++ b/eland/DataFrame.py @@ -3,6 +3,8 @@ import eland from elasticsearch import Elasticsearch from elasticsearch_dsl import Search +import json + import pandas as pd class DataFrame(): @@ -16,19 +18,21 @@ class DataFrame(): @staticmethod def _flatten_results(prefix, results, result): # TODO + return prefix @staticmethod def _es_results_to_pandas(results): # TODO - resolve nested fields rows = [] for hit in results['hits']['hits']: - row = {} - for k in hit.keys(): - if k == '_source': - row.update(hit['_source']) + row = hit['_source'] rows.append(row) - return pd.DataFrame(data=rows) - + #return pd.DataFrame(data=rows) + # Converting the list of dicts to a dataframe doesn't convert datetimes + # effectively compared to read_json. TODO - https://github.com/elastic/eland/issues/2 + json_rows = json.dumps(rows) + return pd.read_json(json_rows) + @staticmethod def _flatten_mapping(prefix, properties, result): for k, v in properties.items(): @@ -59,7 +63,7 @@ class DataFrame(): def head(self, n=5): results = self.client.search(index=self.index_pattern, size=n) - + return DataFrame._es_results_to_pandas(results) def describe(self): diff --git a/eland/tests/__init__.py b/eland/tests/__init__.py new file mode 100644 index 0000000..0d8dfdf --- /dev/null +++ b/eland/tests/__init__.py @@ -0,0 +1,12 @@ +import os + +ROOT_DIR = os.path.dirname(os.path.abspath(__file__)) + +# Define test files and indices +ELASTICSEARCH_HOST = 'localhost' # TODO externalise this + +FLIGHTS_INDEX_NAME = 'flights' +FLIGHTS_FILE_NAME = ROOT_DIR + '/flights.json.gz' + +ECOMMERCE_INDEX_NAME = 'ecommerce' +ECOMMERCE_FILE_NAME = ROOT_DIR + '/ecommerce.json.gz' diff --git a/tests/__init__.py b/eland/tests/dataframe/__init__.py similarity index 100% rename from tests/__init__.py rename to eland/tests/dataframe/__init__.py diff --git a/eland/tests/dataframe/common.py b/eland/tests/dataframe/common.py new file mode 100644 index 0000000..d0b1ef0 --- /dev/null +++ b/eland/tests/dataframe/common.py @@ -0,0 +1,33 @@ +import pytest + +import eland as ed + +import pandas as pd + +import os + +ROOT_DIR = os.path.dirname(os.path.abspath(__file__)) + +# Create pandas and eland data frames +from eland.tests import ELASTICSEARCH_HOST +from eland.tests import FLIGHTS_FILE_NAME, FLIGHTS_INDEX_NAME, ECOMMERCE_FILE_NAME, ECOMMERCE_INDEX_NAME + +_pd_flights = pd.read_json(FLIGHTS_FILE_NAME, lines=True) +_ed_flights = ed.read_es(ELASTICSEARCH_HOST, FLIGHTS_INDEX_NAME) + +_pd_ecommerce = pd.read_json(ECOMMERCE_FILE_NAME, lines=True) +_ed_ecommerce = ed.read_es(ELASTICSEARCH_HOST, ECOMMERCE_INDEX_NAME) + +class TestData: + + def pd_flights(self): + return _pd_flights + + def ed_flights(self): + return _ed_flights + + def pd_ecommerce(self): + return _pd_ecommerce + + def ed_ecommerce(self): + return _ed_ecommerce diff --git a/eland/tests/dataframe/test_indexing_pytest.py b/eland/tests/dataframe/test_indexing_pytest.py new file mode 100644 index 0000000..7fb94e7 --- /dev/null +++ b/eland/tests/dataframe/test_indexing_pytest.py @@ -0,0 +1,24 @@ +# File called _pytest for PyCharm compatability +from eland.tests.dataframe.common import TestData + +from pandas.util.testing import ( + assert_almost_equal, assert_frame_equal, assert_series_equal) + +class TestDataFrameIndexing(TestData): + + def test_head(self): + pd_flights_head = self.pd_flights().head() + ed_flights_head = self.ed_flights().head() + + assert_frame_equal(pd_flights_head, ed_flights_head) + + pd_ecommerce_head = self.pd_ecommerce().head() + ed_ecommerce_head = self.ed_ecommerce().head() + + #print(pd_ecommerce_head.dtypes) + #print(ed_ecommerce_head.dtypes) + + assert_frame_equal(pd_ecommerce_head, ed_ecommerce_head) + + + diff --git a/eland/tests/ecommerce.json.gz b/eland/tests/ecommerce.json.gz new file mode 100644 index 0000000..b1a5dff Binary files /dev/null and b/eland/tests/ecommerce.json.gz differ diff --git a/tests/flights.json.gz b/eland/tests/flights.json.gz similarity index 100% rename from tests/flights.json.gz rename to eland/tests/flights.json.gz diff --git a/eland/tests/index_flights.py b/eland/tests/index_flights.py new file mode 100644 index 0000000..20d7380 --- /dev/null +++ b/eland/tests/index_flights.py @@ -0,0 +1,51 @@ +import pandas as pd +from elasticsearch import Elasticsearch +from elasticsearch import helpers + +from eland.tests import FLIGHTS_FILE_NAME, FLIGHTS_INDEX_NAME, ECOMMERCE_FILE_NAME, ECOMMERCE_INDEX_NAME + + +DATA_LIST = [ + (FLIGHTS_FILE_NAME, FLIGHTS_INDEX_NAME), + (ECOMMERCE_FILE_NAME, ECOMMERCE_INDEX_NAME) +] + +if __name__ == '__main__': + + # Read json file and index records into Elasticsearch + for data in DATA_LIST: + json_file_name = data[0] + index_name = data[1] + + # Create connection to Elasticsearch - use defaults1 + es = Elasticsearch() + + # Delete index + print("Deleting index:", index_name) + es.indices.delete(index=index_name, ignore=[400, 404]) + + df = pd.read_json(json_file_name, lines=True) + + actions = [] + n = 0 + + print("Adding", df.shape[0], "items to index:", index_name) + for index, row in df.iterrows(): + values = row.to_dict() + # make timestamp datetime 2018-01-01T12:09:35 + #values['timestamp'] = datetime.strptime(values['timestamp'], '%Y-%m-%dT%H:%M:%S') + + action = {'_index': index_name, '_source': values} + + actions.append(action) + + n = n + 1 + + if n % 10000 == 0: + helpers.bulk(es, actions) + actions = [] + + helpers.bulk(es, actions) + actions = [] + + print("Done", index_name) diff --git a/tests/test.ipynb b/eland/tests/test.ipynb similarity index 100% rename from tests/test.ipynb rename to eland/tests/test.ipynb diff --git a/tests/dataframe/__init__.py b/tests/dataframe/__init__.py deleted file mode 100644 index e69de29..0000000 diff --git a/tests/dataframe/common.py b/tests/dataframe/common.py deleted file mode 100644 index 2258c0b..0000000 --- a/tests/dataframe/common.py +++ /dev/null @@ -1,16 +0,0 @@ -import eland as ed - -import pandas as pd - -# Create pandas and eland data frames -_pd_df = pd.read_json('flights.json.gz', lines=True) -_ed_df = ed.read_es('localhost', 'flights') - -class TestData: - - def pandas_frame(self): - return _pd_df - - def eland_frame(self): - return _ed_df - diff --git a/tests/dataframe/test_indexing.py b/tests/dataframe/test_indexing.py deleted file mode 100644 index 3ad2659..0000000 --- a/tests/dataframe/test_indexing.py +++ /dev/null @@ -1,19 +0,0 @@ -import pytest - -import eland as ed - -from eland.tests.dataframe.common import TestData - -from pandas.util.testing import ( - assert_almost_equal, assert_frame_equal, assert_series_equal) - -class TestDataFrameIndexing(TestData): - - def test_head(self): - pd_head = self.pd_df.head() - ed_head = self.ed_df.head() - - assert_frame_equal(pd_head, ed_head) - - - diff --git a/tests/index_flights.py b/tests/index_flights.py deleted file mode 100644 index a1cb304..0000000 --- a/tests/index_flights.py +++ /dev/null @@ -1,38 +0,0 @@ -import pandas as pd -from elasticsearch import Elasticsearch -from elasticsearch import helpers - -if __name__ == '__main__': - - # Read json file and index records into Elasticsearch - json_file_name = 'flights.json.gz' - index_name = 'flights' - - # Create connection to Elasticsearch - use defaults1 - es = Elasticsearch() - - # Delete index - print("Deleting index:", index_name) - es.indices.delete(index=index_name, ignore=[400, 404]) - - df = pd.read_json(json_file_name, lines=True) - - actions = [] - n = 0 - - print("Adding", df.shape[0], "items to index:", index_name) - for index, row in df.iterrows(): - action = {"_index": index_name, '_source': row.to_dict()} - - actions.append(action) - - n = n + 1 - - if n % 10000 == 0: - helpers.bulk(es, actions) - actions = [] - - helpers.bulk(es, actions) - actions = [] - - print("Done", es.cat.indices(index_name))