From 71f2a3f7933647a961429f8683787b03a08545dc Mon Sep 17 00:00:00 2001 From: Stephen Dodson Date: Tue, 31 Mar 2020 16:20:47 +0200 Subject: [PATCH] Added 'use_pandas_index_for_es_ids' param to pandas_to_eland() --- eland/tests/common.py | 2 +- eland/tests/dataframe/test_utils_pytest.py | 37 ++++++++++++++++++++++ eland/utils.py | 19 +++++++---- 3 files changed, 51 insertions(+), 7 deletions(-) diff --git a/eland/tests/common.py b/eland/tests/common.py index 7db8cdc..d04fe13 100644 --- a/eland/tests/common.py +++ b/eland/tests/common.py @@ -15,7 +15,7 @@ import os import pandas as pd -from pandas.util.testing import assert_frame_equal, assert_series_equal +from pandas.testing import assert_frame_equal, assert_series_equal import eland as ed diff --git a/eland/tests/dataframe/test_utils_pytest.py b/eland/tests/dataframe/test_utils_pytest.py index 2478fdd..a896397 100644 --- a/eland/tests/dataframe/test_utils_pytest.py +++ b/eland/tests/dataframe/test_utils_pytest.py @@ -65,6 +65,43 @@ class TestDataFrameUtils(TestData): assert_pandas_eland_frame_equal(df, ed_df_head) + ES_TEST_CLIENT.indices.delete(index=index_name) + + def test_pandas_to_eland_ignore_index(self): + df = pd.DataFrame( + data={ + "A": np.random.rand(3), + "B": 1, + "C": "foo", + "D": pd.Timestamp("20190102"), + "E": [1.0, 2.0, 3.0], + "F": False, + "G": [1, 2, 3], + }, + index=["0", "1", "2"], + ) + + # Now create index + index_name = "test_pandas_to_eland_ignore_index" + + ed_df = ed.pandas_to_eland( + df, + ES_TEST_CLIENT, + index_name, + es_if_exists="replace", + es_refresh=True, + use_pandas_index_for_es_ids=False, + ) + pd_df = ed.eland_to_pandas(ed_df) + + # Compare values excluding index + assert df.values.all() == pd_df.values.all() + + # Ensure that index is populated by ES. + assert not (df.index == pd_df.index).any() + + ES_TEST_CLIENT.indices.delete(index=index_name) + def test_eland_to_pandas_performance(self): # TODO quantify this ed.eland_to_pandas(self.ed_flights(), show_progress=True) diff --git a/eland/utils.py b/eland/utils.py index 1b8fb0e..c83e410 100644 --- a/eland/utils.py +++ b/eland/utils.py @@ -58,6 +58,7 @@ def pandas_to_eland( es_dropna=False, es_geo_points=None, chunksize=None, + use_pandas_index_for_es_ids=True, ): """ Append a pandas DataFrame to an Elasticsearch index. @@ -86,7 +87,10 @@ def pandas_to_eland( es_geo_points: list, default None List of columns to map to geo_point data type chunksize: int, default None - number of pandas.DataFrame rows to read before bulk index into Elasticsearch + Number of pandas.DataFrame rows to read before bulk index into Elasticsearch + use_pandas_index_for_es_ids: bool, default 'True' + * True: pandas.DataFrame.index fields will be used to populate Elasticsearch '_id' fields. + * False: Ignore pandas.DataFrame.index when indexing into Elasticsearch Returns ------- @@ -185,16 +189,19 @@ def pandas_to_eland( actions = [] n = 0 for row in pd_df.iterrows(): - # Use index as _id - id = row[0] - if es_dropna: values = row[1].dropna().to_dict() else: values = row[1].to_dict() - # Use integer as id field for repeatable results - action = {"_index": es_dest_index, "_source": values, "_id": str(id)} + if use_pandas_index_for_es_ids: + # Use index as _id + id = row[0] + + # Use integer as id field for repeatable results + action = {"_index": es_dest_index, "_source": values, "_id": str(id)} + else: + action = {"_index": es_dest_index, "_source": values} actions.append(action)