mirror of
https://github.com/elastic/eland.git
synced 2025-07-11 00:02:14 +08:00
Add tests (pytest) and clean up package
pytest selected as it's used in pandas and dask
This commit is contained in:
parent
5b9a6b3d63
commit
0aceac31b0
5
.gitignore
vendored
5
.gitignore
vendored
@ -11,4 +11,7 @@
|
||||
.idea/
|
||||
|
||||
# pytest files
|
||||
.pytest_cache/1
|
||||
.pytest_cache/
|
||||
|
||||
# Ignore MacOSX files
|
||||
.DS_Store
|
||||
|
@ -3,6 +3,8 @@ import eland
|
||||
from elasticsearch import Elasticsearch
|
||||
from elasticsearch_dsl import Search
|
||||
|
||||
import json
|
||||
|
||||
import pandas as pd
|
||||
|
||||
class DataFrame():
|
||||
@ -16,19 +18,21 @@ class DataFrame():
|
||||
@staticmethod
|
||||
def _flatten_results(prefix, results, result):
|
||||
# TODO
|
||||
return prefix
|
||||
|
||||
@staticmethod
|
||||
def _es_results_to_pandas(results):
|
||||
# TODO - resolve nested fields
|
||||
rows = []
|
||||
for hit in results['hits']['hits']:
|
||||
row = {}
|
||||
for k in hit.keys():
|
||||
if k == '_source':
|
||||
row.update(hit['_source'])
|
||||
row = hit['_source']
|
||||
rows.append(row)
|
||||
return pd.DataFrame(data=rows)
|
||||
|
||||
#return pd.DataFrame(data=rows)
|
||||
# Converting the list of dicts to a dataframe doesn't convert datetimes
|
||||
# effectively compared to read_json. TODO - https://github.com/elastic/eland/issues/2
|
||||
json_rows = json.dumps(rows)
|
||||
return pd.read_json(json_rows)
|
||||
|
||||
@staticmethod
|
||||
def _flatten_mapping(prefix, properties, result):
|
||||
for k, v in properties.items():
|
||||
@ -59,7 +63,7 @@ class DataFrame():
|
||||
|
||||
def head(self, n=5):
|
||||
results = self.client.search(index=self.index_pattern, size=n)
|
||||
|
||||
|
||||
return DataFrame._es_results_to_pandas(results)
|
||||
|
||||
def describe(self):
|
||||
|
12
eland/tests/__init__.py
Normal file
12
eland/tests/__init__.py
Normal file
@ -0,0 +1,12 @@
|
||||
import os
|
||||
|
||||
ROOT_DIR = os.path.dirname(os.path.abspath(__file__))
|
||||
|
||||
# Define test files and indices
|
||||
ELASTICSEARCH_HOST = 'localhost' # TODO externalise this
|
||||
|
||||
FLIGHTS_INDEX_NAME = 'flights'
|
||||
FLIGHTS_FILE_NAME = ROOT_DIR + '/flights.json.gz'
|
||||
|
||||
ECOMMERCE_INDEX_NAME = 'ecommerce'
|
||||
ECOMMERCE_FILE_NAME = ROOT_DIR + '/ecommerce.json.gz'
|
33
eland/tests/dataframe/common.py
Normal file
33
eland/tests/dataframe/common.py
Normal file
@ -0,0 +1,33 @@
|
||||
import pytest
|
||||
|
||||
import eland as ed
|
||||
|
||||
import pandas as pd
|
||||
|
||||
import os
|
||||
|
||||
ROOT_DIR = os.path.dirname(os.path.abspath(__file__))
|
||||
|
||||
# Create pandas and eland data frames
|
||||
from eland.tests import ELASTICSEARCH_HOST
|
||||
from eland.tests import FLIGHTS_FILE_NAME, FLIGHTS_INDEX_NAME, ECOMMERCE_FILE_NAME, ECOMMERCE_INDEX_NAME
|
||||
|
||||
_pd_flights = pd.read_json(FLIGHTS_FILE_NAME, lines=True)
|
||||
_ed_flights = ed.read_es(ELASTICSEARCH_HOST, FLIGHTS_INDEX_NAME)
|
||||
|
||||
_pd_ecommerce = pd.read_json(ECOMMERCE_FILE_NAME, lines=True)
|
||||
_ed_ecommerce = ed.read_es(ELASTICSEARCH_HOST, ECOMMERCE_INDEX_NAME)
|
||||
|
||||
class TestData:
|
||||
|
||||
def pd_flights(self):
|
||||
return _pd_flights
|
||||
|
||||
def ed_flights(self):
|
||||
return _ed_flights
|
||||
|
||||
def pd_ecommerce(self):
|
||||
return _pd_ecommerce
|
||||
|
||||
def ed_ecommerce(self):
|
||||
return _ed_ecommerce
|
24
eland/tests/dataframe/test_indexing_pytest.py
Normal file
24
eland/tests/dataframe/test_indexing_pytest.py
Normal file
@ -0,0 +1,24 @@
|
||||
# File called _pytest for PyCharm compatability
|
||||
from eland.tests.dataframe.common import TestData
|
||||
|
||||
from pandas.util.testing import (
|
||||
assert_almost_equal, assert_frame_equal, assert_series_equal)
|
||||
|
||||
class TestDataFrameIndexing(TestData):
|
||||
|
||||
def test_head(self):
|
||||
pd_flights_head = self.pd_flights().head()
|
||||
ed_flights_head = self.ed_flights().head()
|
||||
|
||||
assert_frame_equal(pd_flights_head, ed_flights_head)
|
||||
|
||||
pd_ecommerce_head = self.pd_ecommerce().head()
|
||||
ed_ecommerce_head = self.ed_ecommerce().head()
|
||||
|
||||
#print(pd_ecommerce_head.dtypes)
|
||||
#print(ed_ecommerce_head.dtypes)
|
||||
|
||||
assert_frame_equal(pd_ecommerce_head, ed_ecommerce_head)
|
||||
|
||||
|
||||
|
BIN
eland/tests/ecommerce.json.gz
Normal file
BIN
eland/tests/ecommerce.json.gz
Normal file
Binary file not shown.
51
eland/tests/index_flights.py
Normal file
51
eland/tests/index_flights.py
Normal file
@ -0,0 +1,51 @@
|
||||
import pandas as pd
|
||||
from elasticsearch import Elasticsearch
|
||||
from elasticsearch import helpers
|
||||
|
||||
from eland.tests import FLIGHTS_FILE_NAME, FLIGHTS_INDEX_NAME, ECOMMERCE_FILE_NAME, ECOMMERCE_INDEX_NAME
|
||||
|
||||
|
||||
DATA_LIST = [
|
||||
(FLIGHTS_FILE_NAME, FLIGHTS_INDEX_NAME),
|
||||
(ECOMMERCE_FILE_NAME, ECOMMERCE_INDEX_NAME)
|
||||
]
|
||||
|
||||
if __name__ == '__main__':
|
||||
|
||||
# Read json file and index records into Elasticsearch
|
||||
for data in DATA_LIST:
|
||||
json_file_name = data[0]
|
||||
index_name = data[1]
|
||||
|
||||
# Create connection to Elasticsearch - use defaults1
|
||||
es = Elasticsearch()
|
||||
|
||||
# Delete index
|
||||
print("Deleting index:", index_name)
|
||||
es.indices.delete(index=index_name, ignore=[400, 404])
|
||||
|
||||
df = pd.read_json(json_file_name, lines=True)
|
||||
|
||||
actions = []
|
||||
n = 0
|
||||
|
||||
print("Adding", df.shape[0], "items to index:", index_name)
|
||||
for index, row in df.iterrows():
|
||||
values = row.to_dict()
|
||||
# make timestamp datetime 2018-01-01T12:09:35
|
||||
#values['timestamp'] = datetime.strptime(values['timestamp'], '%Y-%m-%dT%H:%M:%S')
|
||||
|
||||
action = {'_index': index_name, '_source': values}
|
||||
|
||||
actions.append(action)
|
||||
|
||||
n = n + 1
|
||||
|
||||
if n % 10000 == 0:
|
||||
helpers.bulk(es, actions)
|
||||
actions = []
|
||||
|
||||
helpers.bulk(es, actions)
|
||||
actions = []
|
||||
|
||||
print("Done", index_name)
|
@ -1,16 +0,0 @@
|
||||
import eland as ed
|
||||
|
||||
import pandas as pd
|
||||
|
||||
# Create pandas and eland data frames
|
||||
_pd_df = pd.read_json('flights.json.gz', lines=True)
|
||||
_ed_df = ed.read_es('localhost', 'flights')
|
||||
|
||||
class TestData:
|
||||
|
||||
def pandas_frame(self):
|
||||
return _pd_df
|
||||
|
||||
def eland_frame(self):
|
||||
return _ed_df
|
||||
|
@ -1,19 +0,0 @@
|
||||
import pytest
|
||||
|
||||
import eland as ed
|
||||
|
||||
from eland.tests.dataframe.common import TestData
|
||||
|
||||
from pandas.util.testing import (
|
||||
assert_almost_equal, assert_frame_equal, assert_series_equal)
|
||||
|
||||
class TestDataFrameIndexing(TestData):
|
||||
|
||||
def test_head(self):
|
||||
pd_head = self.pd_df.head()
|
||||
ed_head = self.ed_df.head()
|
||||
|
||||
assert_frame_equal(pd_head, ed_head)
|
||||
|
||||
|
||||
|
@ -1,38 +0,0 @@
|
||||
import pandas as pd
|
||||
from elasticsearch import Elasticsearch
|
||||
from elasticsearch import helpers
|
||||
|
||||
if __name__ == '__main__':
|
||||
|
||||
# Read json file and index records into Elasticsearch
|
||||
json_file_name = 'flights.json.gz'
|
||||
index_name = 'flights'
|
||||
|
||||
# Create connection to Elasticsearch - use defaults1
|
||||
es = Elasticsearch()
|
||||
|
||||
# Delete index
|
||||
print("Deleting index:", index_name)
|
||||
es.indices.delete(index=index_name, ignore=[400, 404])
|
||||
|
||||
df = pd.read_json(json_file_name, lines=True)
|
||||
|
||||
actions = []
|
||||
n = 0
|
||||
|
||||
print("Adding", df.shape[0], "items to index:", index_name)
|
||||
for index, row in df.iterrows():
|
||||
action = {"_index": index_name, '_source': row.to_dict()}
|
||||
|
||||
actions.append(action)
|
||||
|
||||
n = n + 1
|
||||
|
||||
if n % 10000 == 0:
|
||||
helpers.bulk(es, actions)
|
||||
actions = []
|
||||
|
||||
helpers.bulk(es, actions)
|
||||
actions = []
|
||||
|
||||
print("Done", es.cat.indices(index_name))
|
Loading…
x
Reference in New Issue
Block a user