diff --git a/eland/etl.py b/eland/etl.py index 1700e90..e24b396 100644 --- a/eland/etl.py +++ b/eland/etl.py @@ -16,8 +16,8 @@ # under the License. import csv -from typing import Union, List, Tuple, Optional, Mapping, Dict, Any - +from typing import Generator, Union, List, Tuple, Optional, Mapping, Dict, Any +from collections import deque import pandas as pd # type: ignore from pandas.io.parsers import _c_parser_defaults # type: ignore @@ -26,7 +26,7 @@ from eland.field_mappings import FieldMappings, verify_mapping_compatibility from eland.common import ensure_es_client, DEFAULT_CHUNK_SIZE from eland.utils import deprecated_api from elasticsearch import Elasticsearch # type: ignore -from elasticsearch.helpers import bulk # type: ignore +from elasticsearch.helpers import parallel_bulk # type: ignore @deprecated_api("eland.DataFrame()") @@ -67,6 +67,7 @@ def pandas_to_eland( es_refresh: bool = False, es_dropna: bool = False, es_type_overrides: Optional[Mapping[str, str]] = None, + thread_count: int = 4, chunksize: Optional[int] = None, use_pandas_index_for_es_ids: bool = True, ) -> DataFrame: @@ -95,6 +96,8 @@ def pandas_to_eland( * False: Include missing values - may cause bulk to fail es_type_overrides: dict, default None Dict of field_name: es_data_type that overrides default es data types + thread_count: int + number of the threads to use for the bulk requests chunksize: int, default None Number of pandas.DataFrame rows to read before bulk index into Elasticsearch use_pandas_index_for_es_ids: bool, default 'True' @@ -205,33 +208,45 @@ def pandas_to_eland( else: es_client.indices.create(index=es_dest_index, body=mapping) - # Now add data - actions = [] - n = 0 - for row in pd_df.iterrows(): - if es_dropna: - values = row[1].dropna().to_dict() - else: - values = row[1].to_dict() + def action_generator( + pd_df: pd.DataFrame, + es_dropna: bool, + use_pandas_index_for_es_ids: bool, + es_dest_index: str, + ) -> Generator[Dict[str, Any], None, None]: + for row in pd_df.iterrows(): + if es_dropna: + values = row[1].dropna().to_dict() + else: + values = row[1].to_dict() - if use_pandas_index_for_es_ids: - # Use index as _id - id = row[0] + if use_pandas_index_for_es_ids: + # Use index as _id + id = row[0] - # Use integer as id field for repeatable results - action = {"_index": es_dest_index, "_source": values, "_id": str(id)} - else: - action = {"_index": es_dest_index, "_source": values} + action = {"_index": es_dest_index, "_source": values, "_id": str(id)} + else: + action = {"_index": es_dest_index, "_source": values} - actions.append(action) + yield action - n = n + 1 + # parallel_bulk is lazy generator so use deque to consume them immediately + # maxlen = 0 because don't need results of parallel_bulk + deque( + parallel_bulk( + client=es_client, + actions=action_generator( + pd_df, es_dropna, use_pandas_index_for_es_ids, es_dest_index + ), + thread_count=thread_count, + chunk_size=chunksize / thread_count, + ), + maxlen=0, + ) - if n % chunksize == 0: - bulk(client=es_client, actions=actions, refresh=es_refresh) - actions = [] + if es_refresh: + es_client.indices.refresh(index=es_dest_index) - bulk(client=es_client, actions=actions, refresh=es_refresh) return DataFrame(es_client, es_dest_index)