mirror of
https://github.com/elastic/eland.git
synced 2025-07-11 00:02:14 +08:00
Added 'use_pandas_index_for_es_ids' param to pandas_to_eland()
This commit is contained in:
parent
03582b9f5e
commit
71f2a3f793
@ -15,7 +15,7 @@
|
||||
import os
|
||||
|
||||
import pandas as pd
|
||||
from pandas.util.testing import assert_frame_equal, assert_series_equal
|
||||
from pandas.testing import assert_frame_equal, assert_series_equal
|
||||
|
||||
import eland as ed
|
||||
|
||||
|
@ -65,6 +65,43 @@ class TestDataFrameUtils(TestData):
|
||||
|
||||
assert_pandas_eland_frame_equal(df, ed_df_head)
|
||||
|
||||
ES_TEST_CLIENT.indices.delete(index=index_name)
|
||||
|
||||
def test_pandas_to_eland_ignore_index(self):
|
||||
df = pd.DataFrame(
|
||||
data={
|
||||
"A": np.random.rand(3),
|
||||
"B": 1,
|
||||
"C": "foo",
|
||||
"D": pd.Timestamp("20190102"),
|
||||
"E": [1.0, 2.0, 3.0],
|
||||
"F": False,
|
||||
"G": [1, 2, 3],
|
||||
},
|
||||
index=["0", "1", "2"],
|
||||
)
|
||||
|
||||
# Now create index
|
||||
index_name = "test_pandas_to_eland_ignore_index"
|
||||
|
||||
ed_df = ed.pandas_to_eland(
|
||||
df,
|
||||
ES_TEST_CLIENT,
|
||||
index_name,
|
||||
es_if_exists="replace",
|
||||
es_refresh=True,
|
||||
use_pandas_index_for_es_ids=False,
|
||||
)
|
||||
pd_df = ed.eland_to_pandas(ed_df)
|
||||
|
||||
# Compare values excluding index
|
||||
assert df.values.all() == pd_df.values.all()
|
||||
|
||||
# Ensure that index is populated by ES.
|
||||
assert not (df.index == pd_df.index).any()
|
||||
|
||||
ES_TEST_CLIENT.indices.delete(index=index_name)
|
||||
|
||||
def test_eland_to_pandas_performance(self):
|
||||
# TODO quantify this
|
||||
ed.eland_to_pandas(self.ed_flights(), show_progress=True)
|
||||
|
@ -58,6 +58,7 @@ def pandas_to_eland(
|
||||
es_dropna=False,
|
||||
es_geo_points=None,
|
||||
chunksize=None,
|
||||
use_pandas_index_for_es_ids=True,
|
||||
):
|
||||
"""
|
||||
Append a pandas DataFrame to an Elasticsearch index.
|
||||
@ -86,7 +87,10 @@ def pandas_to_eland(
|
||||
es_geo_points: list, default None
|
||||
List of columns to map to geo_point data type
|
||||
chunksize: int, default None
|
||||
number of pandas.DataFrame rows to read before bulk index into Elasticsearch
|
||||
Number of pandas.DataFrame rows to read before bulk index into Elasticsearch
|
||||
use_pandas_index_for_es_ids: bool, default 'True'
|
||||
* True: pandas.DataFrame.index fields will be used to populate Elasticsearch '_id' fields.
|
||||
* False: Ignore pandas.DataFrame.index when indexing into Elasticsearch
|
||||
|
||||
Returns
|
||||
-------
|
||||
@ -185,16 +189,19 @@ def pandas_to_eland(
|
||||
actions = []
|
||||
n = 0
|
||||
for row in pd_df.iterrows():
|
||||
# Use index as _id
|
||||
id = row[0]
|
||||
|
||||
if es_dropna:
|
||||
values = row[1].dropna().to_dict()
|
||||
else:
|
||||
values = row[1].to_dict()
|
||||
|
||||
# Use integer as id field for repeatable results
|
||||
action = {"_index": es_dest_index, "_source": values, "_id": str(id)}
|
||||
if use_pandas_index_for_es_ids:
|
||||
# Use index as _id
|
||||
id = row[0]
|
||||
|
||||
# Use integer as id field for repeatable results
|
||||
action = {"_index": es_dest_index, "_source": values, "_id": str(id)}
|
||||
else:
|
||||
action = {"_index": es_dest_index, "_source": values}
|
||||
|
||||
actions.append(action)
|
||||
|
||||
|
Loading…
x
Reference in New Issue
Block a user