Allow user to specify es data types in read_csv and pandas_to_eland (#181)

* Allow user to specify es data types in read_csv and pandas_to_eland

Also, some minor maintenance modifications:

- replaced pandas.util.testing with pandas.testing (required in 1.x)
- updated elasticsearch-py requirements to 7.6+ (to support ML code)

* linting file
This commit is contained in:
Stephen Dodson 2020-04-14 17:04:12 +02:00 committed by GitHub
parent e1cacead44
commit 50734f8bd9
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
20 changed files with 103 additions and 60 deletions

View File

@ -403,13 +403,16 @@ class FieldMappings:
return es_dtype return es_dtype
@staticmethod @staticmethod
def _generate_es_mappings(dataframe, geo_points=None): def _generate_es_mappings(dataframe, es_type_overrides=None):
"""Given a pandas dataframe, generate the associated Elasticsearch mapping """Given a pandas dataframe, generate the associated Elasticsearch mapping
Parameters Parameters
---------- ----------
dataframe : pandas.DataFrame dataframe : pandas.DataFrame
pandas.DataFrame to create schema from pandas.DataFrame to create schema from
es_type_overrides : dict
Dictionary of Elasticsearch types to override defaults for certain fields
(e.g. { 'location': 'geo_point' })
Returns Returns
------- -------
@ -437,8 +440,8 @@ class FieldMappings:
mappings = {"properties": {}} mappings = {"properties": {}}
for field_name_name, dtype in dataframe.dtypes.iteritems(): for field_name_name, dtype in dataframe.dtypes.iteritems():
if geo_points is not None and field_name_name in geo_points: if es_type_overrides is not None and field_name_name in es_type_overrides:
es_dtype = "geo_point" es_dtype = es_type_overrides[field_name_name]
else: else:
es_dtype = FieldMappings._pd_dtype_to_es_dtype(dtype) es_dtype = FieldMappings._pd_dtype_to_es_dtype(dtype)

View File

@ -15,7 +15,7 @@
# File called _pytest for PyCharm compatability # File called _pytest for PyCharm compatability
import numpy as np import numpy as np
from pandas.util.testing import assert_almost_equal from pandas.testing import assert_frame_equal
from eland.tests.common import TestData from eland.tests.common import TestData
@ -31,7 +31,7 @@ class TestDataFrameAggs(TestData):
# Eland returns all float values for all metric aggs, pandas can return int # Eland returns all float values for all metric aggs, pandas can return int
# TODO - investigate this more # TODO - investigate this more
pd_sum_min = pd_sum_min.astype("float64") pd_sum_min = pd_sum_min.astype("float64")
assert_almost_equal(pd_sum_min, ed_sum_min) assert_frame_equal(pd_sum_min, ed_sum_min, check_exact=False)
pd_sum_min_std = pd_flights.select_dtypes(include=[np.number]).agg( pd_sum_min_std = pd_flights.select_dtypes(include=[np.number]).agg(
["sum", "min", "std"] ["sum", "min", "std"]
@ -43,7 +43,9 @@ class TestDataFrameAggs(TestData):
print(pd_sum_min_std.dtypes) print(pd_sum_min_std.dtypes)
print(ed_sum_min_std.dtypes) print(ed_sum_min_std.dtypes)
assert_almost_equal(pd_sum_min_std, ed_sum_min_std, check_less_precise=True) assert_frame_equal(
pd_sum_min_std, ed_sum_min_std, check_exact=False, check_less_precise=True
)
def test_terms_aggs(self): def test_terms_aggs(self):
pd_flights = self.pd_flights() pd_flights = self.pd_flights()
@ -55,7 +57,7 @@ class TestDataFrameAggs(TestData):
# Eland returns all float values for all metric aggs, pandas can return int # Eland returns all float values for all metric aggs, pandas can return int
# TODO - investigate this more # TODO - investigate this more
pd_sum_min = pd_sum_min.astype("float64") pd_sum_min = pd_sum_min.astype("float64")
assert_almost_equal(pd_sum_min, ed_sum_min) assert_frame_equal(pd_sum_min, ed_sum_min, check_exact=False)
pd_sum_min_std = pd_flights.select_dtypes(include=[np.number]).agg( pd_sum_min_std = pd_flights.select_dtypes(include=[np.number]).agg(
["sum", "min", "std"] ["sum", "min", "std"]
@ -67,7 +69,9 @@ class TestDataFrameAggs(TestData):
print(pd_sum_min_std.dtypes) print(pd_sum_min_std.dtypes)
print(ed_sum_min_std.dtypes) print(ed_sum_min_std.dtypes)
assert_almost_equal(pd_sum_min_std, ed_sum_min_std, check_less_precise=True) assert_frame_equal(
pd_sum_min_std, ed_sum_min_std, check_exact=False, check_less_precise=True
)
def test_aggs_median_var(self): def test_aggs_median_var(self):
pd_ecommerce = self.pd_ecommerce() pd_ecommerce = self.pd_ecommerce()
@ -86,4 +90,4 @@ class TestDataFrameAggs(TestData):
# Eland returns all float values for all metric aggs, pandas can return int # Eland returns all float values for all metric aggs, pandas can return int
# TODO - investigate this more # TODO - investigate this more
pd_aggs = pd_aggs.astype("float64") pd_aggs = pd_aggs.astype("float64")
assert_almost_equal(pd_aggs, ed_aggs, check_less_precise=2) assert_frame_equal(pd_aggs, ed_aggs, check_exact=False, check_less_precise=2)

View File

@ -14,7 +14,7 @@
# File called _pytest for PyCharm compatability # File called _pytest for PyCharm compatability
from pandas.util.testing import assert_series_equal from pandas.testing import assert_series_equal
from eland.tests.common import TestData from eland.tests.common import TestData

View File

@ -17,7 +17,7 @@ from datetime import datetime
import numpy as np import numpy as np
import pandas as pd import pandas as pd
from pandas.util.testing import assert_series_equal from pandas.testing import assert_series_equal
import eland as ed import eland as ed
from eland.field_mappings import FieldMappings from eland.field_mappings import FieldMappings

View File

@ -14,7 +14,7 @@
# File called _pytest for PyCharm compatability # File called _pytest for PyCharm compatability
from pandas.util.testing import assert_almost_equal from pandas.testing import assert_frame_equal
from eland.tests.common import TestData from eland.tests.common import TestData
@ -27,9 +27,10 @@ class TestDataFrameDescribe(TestData):
pd_describe = pd_flights.describe() pd_describe = pd_flights.describe()
ed_describe = ed_flights.describe() ed_describe = ed_flights.describe()
assert_almost_equal( assert_frame_equal(
pd_describe.drop(["25%", "50%", "75%"], axis="index"), pd_describe.drop(["25%", "50%", "75%"], axis="index"),
ed_describe.drop(["25%", "50%", "75%"], axis="index"), ed_describe.drop(["25%", "50%", "75%"], axis="index"),
check_exact=False,
check_less_precise=True, check_less_precise=True,
) )

View File

@ -15,7 +15,7 @@
# File called _pytest for PyCharm compatability # File called _pytest for PyCharm compatability
import numpy as np import numpy as np
from pandas.util.testing import assert_series_equal from pandas.testing import assert_series_equal
from eland.tests.common import TestData from eland.tests.common import TestData
from eland.tests.common import assert_pandas_eland_frame_equal from eland.tests.common import assert_pandas_eland_frame_equal

View File

@ -16,7 +16,7 @@
import numpy as np import numpy as np
import pandas as pd import pandas as pd
from pandas.util.testing import assert_almost_equal from pandas.testing import assert_frame_equal
from eland.tests.common import TestData from eland.tests.common import TestData
@ -52,8 +52,8 @@ class TestDataFrameHist(TestData):
]._hist(num_bins=num_bins) ]._hist(num_bins=num_bins)
# Numbers are slightly different # Numbers are slightly different
assert_almost_equal(pd_bins, ed_bins) assert_frame_equal(pd_bins, ed_bins, check_exact=False)
assert_almost_equal(pd_weights, ed_weights) assert_frame_equal(pd_weights, ed_weights, check_exact=False)
def test_flights_filtered_hist(self): def test_flights_filtered_hist(self):
pd_flights = self.pd_flights() pd_flights = self.pd_flights()
@ -88,5 +88,5 @@ class TestDataFrameHist(TestData):
]._hist(num_bins=num_bins) ]._hist(num_bins=num_bins)
# Numbers are slightly different # Numbers are slightly different
assert_almost_equal(pd_bins, ed_bins) assert_frame_equal(pd_bins, ed_bins, check_exact=False)
assert_almost_equal(pd_weights, ed_weights) assert_frame_equal(pd_weights, ed_weights, check_exact=False)

View File

@ -14,7 +14,7 @@
# File called _pytest for PyCharm compatability # File called _pytest for PyCharm compatability
from pandas.util.testing import assert_series_equal from pandas.testing import assert_series_equal
from eland.tests.common import TestData from eland.tests.common import TestData

View File

@ -18,7 +18,7 @@ import ast
import time import time
import pandas as pd import pandas as pd
from pandas.util.testing import assert_frame_equal from pandas.testing import assert_frame_equal
import eland as ed import eland as ed
from eland.tests import ES_TEST_CLIENT from eland.tests import ES_TEST_CLIENT
@ -81,7 +81,10 @@ class TestDataFrameToCSV(TestData):
test_index, test_index,
index_col=0, index_col=0,
es_refresh=True, es_refresh=True,
es_geo_points=["OriginLocation", "DestLocation"], es_type_overrides={
"OriginLocation": "geo_point",
"DestLocation": "geo_point",
},
converters={ converters={
"DestLocation": lambda x: ast.literal_eval(x), "DestLocation": lambda x: ast.literal_eval(x),
"OriginLocation": lambda x: ast.literal_eval(x), "OriginLocation": lambda x: ast.literal_eval(x),

View File

@ -78,6 +78,8 @@ class TestDataFrameUtils(TestData):
"E": [1.0, 2.0, 3.0], "E": [1.0, 2.0, 3.0],
"F": False, "F": False,
"G": [1, 2, 3], "G": [1, 2, 3],
"H": "Long text", # text
"I": "52.36,4.83", # geo point
}, },
index=["0", "1", "2"], index=["0", "1", "2"],
) )
@ -92,7 +94,33 @@ class TestDataFrameUtils(TestData):
es_if_exists="replace", es_if_exists="replace",
es_refresh=True, es_refresh=True,
use_pandas_index_for_es_ids=False, use_pandas_index_for_es_ids=False,
es_type_overrides={"H": "text", "I": "geo_point"},
) )
# Check types
expected_mapping = {
"test_pandas_to_eland_ignore_index": {
"mappings": {
"properties": {
"A": {"type": "double"},
"B": {"type": "long"},
"C": {"type": "keyword"},
"D": {"type": "date"},
"E": {"type": "double"},
"F": {"type": "boolean"},
"G": {"type": "long"},
"H": {"type": "text"},
"I": {"type": "geo_point"},
}
}
}
}
mapping = ES_TEST_CLIENT.indices.get_mapping(index_name)
assert expected_mapping == mapping
# Convert back to pandas and compare with original
pd_df = ed.eland_to_pandas(ed_df) pd_df = ed.eland_to_pandas(ed_df)
# Compare values excluding index # Compare values excluding index

View File

@ -13,7 +13,7 @@
# limitations under the License. # limitations under the License.
# File called _pytest for PyCharm compatability # File called _pytest for PyCharm compatability
from pandas.util.testing import assert_series_equal from pandas.testing import assert_series_equal
from eland.field_mappings import FieldMappings from eland.field_mappings import FieldMappings
from eland.tests import ES_TEST_CLIENT, FLIGHTS_INDEX_NAME from eland.tests import ES_TEST_CLIENT, FLIGHTS_INDEX_NAME

View File

@ -14,7 +14,7 @@
# File called _pytest for PyCharm compatability # File called _pytest for PyCharm compatability
import pytest import pytest
from pandas.util.testing import assert_series_equal from pandas.testing import assert_series_equal
from eland.field_mappings import FieldMappings from eland.field_mappings import FieldMappings
from eland.tests import FLIGHTS_INDEX_NAME, FLIGHTS_MAPPING from eland.tests import FLIGHTS_INDEX_NAME, FLIGHTS_MAPPING

View File

@ -14,7 +14,7 @@
import numpy as np import numpy as np
import pandas as pd import pandas as pd
from pandas.util.testing import assert_index_equal from pandas.testing import assert_index_equal
# File called _pytest for PyCharm compatability # File called _pytest for PyCharm compatability
from eland.field_mappings import FieldMappings from eland.field_mappings import FieldMappings

View File

@ -14,7 +14,7 @@
# File called _pytest for PyCharm compatability # File called _pytest for PyCharm compatability
import pandas as pd import pandas as pd
from pandas.util.testing import assert_index_equal from pandas.testing import assert_index_equal
from eland.tests.common import TestData from eland.tests.common import TestData

View File

@ -17,7 +17,7 @@
import numpy as np import numpy as np
import pandas as pd import pandas as pd
import pytest import pytest
from pandas.util.testing import assert_almost_equal from pandas.testing import assert_frame_equal
from eland.tests.common import TestData from eland.tests.common import TestData
@ -39,8 +39,8 @@ class TestSeriesFrameHist(TestData):
# Numbers are slightly different # Numbers are slightly different
print(pd_bins, ed_bins) print(pd_bins, ed_bins)
assert_almost_equal(pd_bins, ed_bins) assert_frame_equal(pd_bins, ed_bins, check_exact=False)
assert_almost_equal(pd_weights, ed_weights) assert_frame_equal(pd_weights, ed_weights, check_exact=False)
def test_filtered_hist(self): def test_filtered_hist(self):
pd_flights = self.pd_flights() pd_flights = self.pd_flights()
@ -64,8 +64,8 @@ class TestSeriesFrameHist(TestData):
].FlightDelayMin._hist(num_bins=num_bins) ].FlightDelayMin._hist(num_bins=num_bins)
# Numbers are slightly different # Numbers are slightly different
assert_almost_equal(pd_bins, ed_bins) assert_frame_equal(pd_bins, ed_bins, check_exact=False)
assert_almost_equal(pd_weights, ed_weights) assert_frame_equal(pd_weights, ed_weights, check_exact=False)
def test_invalid_hist(self): def test_invalid_hist(self):
with pytest.raises(ValueError): with pytest.raises(ValueError):

View File

@ -14,7 +14,7 @@
# File called _pytest for PyCharm compatability # File called _pytest for PyCharm compatability
from pandas.util.testing import assert_almost_equal import numpy as np
from eland.tests.common import TestData from eland.tests.common import TestData
@ -30,7 +30,7 @@ class TestSeriesMetrics(TestData):
for func in self.funcs: for func in self.funcs:
pd_metric = getattr(pd_flights, func)() pd_metric = getattr(pd_flights, func)()
ed_metric = getattr(ed_flights, func)() ed_metric = getattr(ed_flights, func)()
assert_almost_equal(pd_metric, ed_metric, check_less_precise=True) np.testing.assert_almost_equal(pd_metric, ed_metric, decimal=2)
def test_flights_timestamp(self): def test_flights_timestamp(self):
pd_flights = self.pd_flights()["timestamp"] pd_flights = self.pd_flights()["timestamp"]
@ -40,7 +40,7 @@ class TestSeriesMetrics(TestData):
pd_metric = getattr(pd_flights, func)() pd_metric = getattr(pd_flights, func)()
ed_metric = getattr(ed_flights, func)() ed_metric = getattr(ed_flights, func)()
pd_metric = pd_metric.floor("S") # floor or pandas mean with have ns pd_metric = pd_metric.floor("S") # floor or pandas mean with have ns
assert_almost_equal(pd_metric, ed_metric, check_less_precise=True) assert pd_metric == ed_metric
def test_ecommerce_selected_non_numeric_source_fields(self): def test_ecommerce_selected_non_numeric_source_fields(self):
# None of these are numeric # None of these are numeric
@ -61,8 +61,8 @@ class TestSeriesMetrics(TestData):
ed_ecommerce = self.ed_ecommerce()[column] ed_ecommerce = self.ed_ecommerce()[column]
for func in self.funcs: for func in self.funcs:
assert_almost_equal( np.testing.assert_almost_equal(
getattr(pd_ecommerce, func)(), getattr(pd_ecommerce, func)(),
getattr(ed_ecommerce, func)(), getattr(ed_ecommerce, func)(),
check_less_precise=True, decimal=2,
) )

View File

@ -14,7 +14,7 @@
# File called _pytest for PyCharm compatability # File called _pytest for PyCharm compatability
import pytest import pytest
from pandas.util.testing import assert_series_equal from pandas.testing import assert_series_equal
from eland.tests.common import TestData from eland.tests.common import TestData

View File

@ -56,7 +56,7 @@ def pandas_to_eland(
es_if_exists="fail", es_if_exists="fail",
es_refresh=False, es_refresh=False,
es_dropna=False, es_dropna=False,
es_geo_points=None, es_type_overrides=None,
chunksize=None, chunksize=None,
use_pandas_index_for_es_ids=True, use_pandas_index_for_es_ids=True,
): ):
@ -83,8 +83,8 @@ def pandas_to_eland(
es_dropna: bool, default 'False' es_dropna: bool, default 'False'
* True: Remove missing values (see pandas.Series.dropna) * True: Remove missing values (see pandas.Series.dropna)
* False: Include missing values - may cause bulk to fail * False: Include missing values - may cause bulk to fail
es_geo_points: list, default None es_type_overrides: dict, default None
List of columns to map to geo_point data type Dict of field_name: es_data_type that overrides default es data types
chunksize: int, default None chunksize: int, default None
Number of pandas.DataFrame rows to read before bulk index into Elasticsearch Number of pandas.DataFrame rows to read before bulk index into Elasticsearch
use_pandas_index_for_es_ids: bool, default 'True' use_pandas_index_for_es_ids: bool, default 'True'
@ -105,17 +105,18 @@ def pandas_to_eland(
... 'D': pd.Timestamp('20190102'), ... 'D': pd.Timestamp('20190102'),
... 'E': [1.0, 2.0, 3.0], ... 'E': [1.0, 2.0, 3.0],
... 'F': False, ... 'F': False,
... 'G': [1, 2, 3]}, ... 'G': [1, 2, 3],
... 'H': 'Long text - to be indexed as es type text'},
... index=['0', '1', '2']) ... index=['0', '1', '2'])
>>> type(pd_df) >>> type(pd_df)
<class 'pandas.core.frame.DataFrame'> <class 'pandas.core.frame.DataFrame'>
>>> pd_df >>> pd_df
A B ... F G A B ... G H
0 3.141 1 ... False 1 0 3.141 1 ... 1 Long text - to be indexed as es type text
1 3.141 1 ... False 2 1 3.141 1 ... 2 Long text - to be indexed as es type text
2 3.141 1 ... False 3 2 3.141 1 ... 3 Long text - to be indexed as es type text
<BLANKLINE> <BLANKLINE>
[3 rows x 7 columns] [3 rows x 8 columns]
>>> pd_df.dtypes >>> pd_df.dtypes
A float64 A float64
B int64 B int64
@ -124,6 +125,7 @@ def pandas_to_eland(
E float64 E float64
F bool F bool
G int64 G int64
H object
dtype: object dtype: object
Convert `pandas.DataFrame` to `eland.DataFrame` - this creates an Elasticsearch index called `pandas_to_eland`. Convert `pandas.DataFrame` to `eland.DataFrame` - this creates an Elasticsearch index called `pandas_to_eland`.
@ -135,16 +137,17 @@ def pandas_to_eland(
... 'localhost', ... 'localhost',
... 'pandas_to_eland', ... 'pandas_to_eland',
... es_if_exists="replace", ... es_if_exists="replace",
... es_refresh=True) ... es_refresh=True,
... es_type_overrides={'H':'text'}) # index field 'H' as text not keyword
>>> type(ed_df) >>> type(ed_df)
<class 'eland.dataframe.DataFrame'> <class 'eland.dataframe.DataFrame'>
>>> ed_df >>> ed_df
A B ... F G A B ... G H
0 3.141 1 ... False 1 0 3.141 1 ... 1 Long text - to be indexed as es type text
1 3.141 1 ... False 2 1 3.141 1 ... 2 Long text - to be indexed as es type text
2 3.141 1 ... False 3 2 3.141 1 ... 3 Long text - to be indexed as es type text
<BLANKLINE> <BLANKLINE>
[3 rows x 7 columns] [3 rows x 8 columns]
>>> ed_df.dtypes >>> ed_df.dtypes
A float64 A float64
B int64 B int64
@ -153,6 +156,7 @@ def pandas_to_eland(
E float64 E float64
F bool F bool
G int64 G int64
H object
dtype: object dtype: object
See Also See Also
@ -163,7 +167,7 @@ def pandas_to_eland(
if chunksize is None: if chunksize is None:
chunksize = DEFAULT_CHUNK_SIZE chunksize = DEFAULT_CHUNK_SIZE
mapping = FieldMappings._generate_es_mappings(pd_df, es_geo_points) mapping = FieldMappings._generate_es_mappings(pd_df, es_type_overrides)
es_client = ensure_es_client(es_client) es_client = ensure_es_client(es_client)
# If table exists, check if_exists parameter # If table exists, check if_exists parameter
@ -283,7 +287,7 @@ def read_csv(
es_if_exists="fail", es_if_exists="fail",
es_refresh=False, es_refresh=False,
es_dropna=False, es_dropna=False,
es_geo_points=None, es_type_overrides=None,
sep=",", sep=",",
delimiter=None, delimiter=None,
# Column and Index Locations and Names # Column and Index Locations and Names
@ -364,8 +368,8 @@ def read_csv(
es_dropna: bool, default 'False' es_dropna: bool, default 'False'
* True: Remove missing values (see pandas.Series.dropna) * True: Remove missing values (see pandas.Series.dropna)
* False: Include missing values - may cause bulk to fail * False: Include missing values - may cause bulk to fail
es_geo_points: list, default None es_type_overrides: dict, default None
List of columns to map to geo_point data type Dict of columns: es_type to override default es datatype mappings
chunksize chunksize
number of csv rows to read before bulk index into Elasticsearch number of csv rows to read before bulk index into Elasticsearch
@ -498,7 +502,7 @@ def read_csv(
chunksize=chunksize, chunksize=chunksize,
es_refresh=es_refresh, es_refresh=es_refresh,
es_dropna=es_dropna, es_dropna=es_dropna,
es_geo_points=es_geo_points, es_type_overrides=es_type_overrides,
) )
first_write = False first_write = False
else: else:
@ -510,7 +514,7 @@ def read_csv(
chunksize=chunksize, chunksize=chunksize,
es_refresh=es_refresh, es_refresh=es_refresh,
es_dropna=es_dropna, es_dropna=es_dropna,
es_geo_points=es_geo_points, es_type_overrides=es_type_overrides,
) )
# Now create an eland.DataFrame that references the new index # Now create an eland.DataFrame that references the new index

View File

@ -1,4 +1,4 @@
elasticsearch>=7.0.5 elasticsearch>=7.6.0
pandas>=1 pandas>=1
matplotlib matplotlib
pytest>=5.2.1 pytest>=5.2.1

View File

@ -1,3 +1,3 @@
elasticsearch>=7.0.5 elasticsearch>=7.6.0
pandas>=1 pandas>=1
matplotlib matplotlib