diff --git a/eland/common.py b/eland/common.py index 08665c6..6ca3e08 100644 --- a/eland/common.py +++ b/eland/common.py @@ -32,6 +32,7 @@ from typing import ( import pandas as pd # type: ignore from elasticsearch import Elasticsearch +from elasticsearch import __version__ as ES_CLIENT_VERSION if TYPE_CHECKING: from numpy.typing import DTypeLike @@ -48,6 +49,10 @@ PANDAS_VERSION: Tuple[int, ...] = tuple( int(part) for part in pd.__version__.split(".") if part.isdigit() )[:2] +# Starting in 7.15 the client raises DeprecationWarnings +# for some APIs using the 'body' parameter. +ES_CLIENT_HAS_V8_0_DEPRECATIONS = ES_CLIENT_VERSION >= (7, 15) + with warnings.catch_warnings(): warnings.simplefilter("ignore") @@ -329,3 +334,16 @@ def es_version(es_client: Elasticsearch) -> Tuple[int, int, int]: else: eland_es_version = es_client._eland_es_version # type: ignore return eland_es_version + + +def es_api_compat( + method: Callable[..., Dict[str, Any]], **kwargs: Any +) -> Dict[str, Any]: + """Expands the 'body' parameter to top-level parameters + on clients that would raise DeprecationWarnings if used. + """ + if ES_CLIENT_HAS_V8_0_DEPRECATIONS: + body = kwargs.pop("body", None) + if body: + kwargs.update(body) + return method(**kwargs) diff --git a/eland/etl.py b/eland/etl.py index 3b32872..3a60452 100644 --- a/eland/etl.py +++ b/eland/etl.py @@ -24,7 +24,12 @@ from elasticsearch import Elasticsearch from elasticsearch.helpers import parallel_bulk from eland import DataFrame -from eland.common import DEFAULT_CHUNK_SIZE, PANDAS_VERSION, ensure_es_client +from eland.common import ( + DEFAULT_CHUNK_SIZE, + PANDAS_VERSION, + ensure_es_client, + es_api_compat, +) from eland.field_mappings import FieldMappings, verify_mapping_compatibility try: @@ -170,7 +175,7 @@ def pandas_to_eland( elif es_if_exists == "replace": es_client.indices.delete(index=es_dest_index) - es_client.indices.create(index=es_dest_index, body=mapping) + es_api_compat(es_client.indices.create, index=es_dest_index, body=mapping) elif es_if_exists == "append": dest_mapping = es_client.indices.get_mapping(index=es_dest_index)[ @@ -182,7 +187,7 @@ def pandas_to_eland( es_type_overrides=es_type_overrides, ) else: - es_client.indices.create(index=es_dest_index, body=mapping) + es_api_compat(es_client.indices.create, index=es_dest_index, body=mapping) def action_generator( pd_df: pd.DataFrame, diff --git a/eland/operations.py b/eland/operations.py index 48ff43f..8ac8bed 100644 --- a/eland/operations.py +++ b/eland/operations.py @@ -45,6 +45,7 @@ from eland.common import ( SortOrder, build_pd_series, elasticsearch_date_to_pandas_date, + es_api_compat, es_version, ) from eland.index import Index @@ -1524,7 +1525,8 @@ def _search_with_scroll( hits_yielded = 0 # Make the initial search with 'scroll' set - resp = client.search( + resp = es_api_compat( + client.search, index=query_compiler._index_pattern, body=body, scroll=DEFAULT_PIT_KEEP_ALIVE, @@ -1601,7 +1603,7 @@ def _search_with_pit_and_search_after( body["pit"] = {"id": pit_id, "keep_alive": DEFAULT_PIT_KEEP_ALIVE} while max_number_of_hits is None or hits_yielded < max_number_of_hits: - resp = client.search(body=body) + resp = es_api_compat(client.search, body=body) hits: List[Dict[str, Any]] = resp["hits"]["hits"] # The point in time ID can change between searches so we