Improve efficiency of 'pandas_to_eland()' using 'parallel_bulk()'

This commit is contained in:
P. Sai Vinay 2020-10-08 20:47:22 +05:30 committed by GitHub
parent 225a23a59a
commit 0dd247b693
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

View File

@ -16,8 +16,8 @@
# under the License.
import csv
from typing import Union, List, Tuple, Optional, Mapping, Dict, Any
from typing import Generator, Union, List, Tuple, Optional, Mapping, Dict, Any
from collections import deque
import pandas as pd # type: ignore
from pandas.io.parsers import _c_parser_defaults # type: ignore
@ -26,7 +26,7 @@ from eland.field_mappings import FieldMappings, verify_mapping_compatibility
from eland.common import ensure_es_client, DEFAULT_CHUNK_SIZE
from eland.utils import deprecated_api
from elasticsearch import Elasticsearch # type: ignore
from elasticsearch.helpers import bulk # type: ignore
from elasticsearch.helpers import parallel_bulk # type: ignore
@deprecated_api("eland.DataFrame()")
@ -67,6 +67,7 @@ def pandas_to_eland(
es_refresh: bool = False,
es_dropna: bool = False,
es_type_overrides: Optional[Mapping[str, str]] = None,
thread_count: int = 4,
chunksize: Optional[int] = None,
use_pandas_index_for_es_ids: bool = True,
) -> DataFrame:
@ -95,6 +96,8 @@ def pandas_to_eland(
* False: Include missing values - may cause bulk to fail
es_type_overrides: dict, default None
Dict of field_name: es_data_type that overrides default es data types
thread_count: int
number of the threads to use for the bulk requests
chunksize: int, default None
Number of pandas.DataFrame rows to read before bulk index into Elasticsearch
use_pandas_index_for_es_ids: bool, default 'True'
@ -205,33 +208,45 @@ def pandas_to_eland(
else:
es_client.indices.create(index=es_dest_index, body=mapping)
# Now add data
actions = []
n = 0
for row in pd_df.iterrows():
if es_dropna:
values = row[1].dropna().to_dict()
else:
values = row[1].to_dict()
def action_generator(
pd_df: pd.DataFrame,
es_dropna: bool,
use_pandas_index_for_es_ids: bool,
es_dest_index: str,
) -> Generator[Dict[str, Any], None, None]:
for row in pd_df.iterrows():
if es_dropna:
values = row[1].dropna().to_dict()
else:
values = row[1].to_dict()
if use_pandas_index_for_es_ids:
# Use index as _id
id = row[0]
if use_pandas_index_for_es_ids:
# Use index as _id
id = row[0]
# Use integer as id field for repeatable results
action = {"_index": es_dest_index, "_source": values, "_id": str(id)}
else:
action = {"_index": es_dest_index, "_source": values}
action = {"_index": es_dest_index, "_source": values, "_id": str(id)}
else:
action = {"_index": es_dest_index, "_source": values}
actions.append(action)
yield action
n = n + 1
# parallel_bulk is lazy generator so use deque to consume them immediately
# maxlen = 0 because don't need results of parallel_bulk
deque(
parallel_bulk(
client=es_client,
actions=action_generator(
pd_df, es_dropna, use_pandas_index_for_es_ids, es_dest_index
),
thread_count=thread_count,
chunk_size=chunksize / thread_count,
),
maxlen=0,
)
if n % chunksize == 0:
bulk(client=es_client, actions=actions, refresh=es_refresh)
actions = []
if es_refresh:
es_client.indices.refresh(index=es_dest_index)
bulk(client=es_client, actions=actions, refresh=es_refresh)
return DataFrame(es_client, es_dest_index)