From 064d43b9efd5c441673f105d20bbb069257255f4 Mon Sep 17 00:00:00 2001 From: Seth Michael Larson Date: Mon, 6 Apr 2020 07:25:25 -0500 Subject: [PATCH] Remove eland.Client, use Elasticsearch directly --- eland/__init__.py | 1 - eland/client.py | 61 ------------------- eland/common.py | 11 +++- eland/dataframe.py | 3 +- eland/field_mappings.py | 4 +- eland/ml/imported_ml_model.py | 14 ++--- eland/ml/ml_model.py | 12 ++-- eland/ndframe.py | 2 +- eland/operations.py | 13 ++-- eland/query_compiler.py | 10 +-- eland/series.py | 4 +- eland/tests/client/__init__.py | 13 ---- .../client/test_perform_request_pytest.py | 40 ------------ .../test_aggregatables_pytest.py | 12 ++-- .../field_mappings/test_datetime_pytest.py | 2 +- .../test_display_names_pytest.py | 10 +-- .../field_mappings/test_dtypes_pytest.py | 4 +- .../test_field_name_pd_dtype_pytest.py | 4 +- .../test_get_field_names_pytest.py | 6 +- .../test_metric_source_fields_pytest.py | 12 ++-- .../field_mappings/test_rename_pytest.py | 8 +-- .../test_scripted_fields_pytest.py | 4 +- eland/tests/setup_tests.py | 4 +- eland/utils.py | 43 ++++++------- 24 files changed, 88 insertions(+), 209 deletions(-) delete mode 100644 eland/client.py delete mode 100644 eland/tests/client/__init__.py delete mode 100644 eland/tests/client/test_perform_request_pytest.py diff --git a/eland/__init__.py b/eland/__init__.py index 6fa5c90..2924bea 100644 --- a/eland/__init__.py +++ b/eland/__init__.py @@ -25,7 +25,6 @@ from eland._version import ( __maintainer_email__, ) from eland.common import * -from eland.client import * from eland.filter import * from eland.index import * from eland.field_mappings import * diff --git a/eland/client.py b/eland/client.py deleted file mode 100644 index 20abe95..0000000 --- a/eland/client.py +++ /dev/null @@ -1,61 +0,0 @@ -# Copyright 2019 Elasticsearch BV -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -from elasticsearch import Elasticsearch -from elasticsearch import helpers - - -class Client: - """ - eland client - implemented as facade to control access to Elasticsearch methods - """ - - def __init__(self, es=None): - if isinstance(es, Elasticsearch): - self._es = es - elif isinstance(es, Client): - self._es = es._es - else: - self._es = Elasticsearch(es) - - def index_create(self, **kwargs): - return self._es.indices.create(**kwargs) - - def index_delete(self, **kwargs): - return self._es.indices.delete(**kwargs) - - def index_exists(self, **kwargs): - return self._es.indices.exists(**kwargs) - - def get_mapping(self, **kwargs): - return self._es.indices.get_mapping(**kwargs) - - def bulk(self, actions, refresh=False): - return helpers.bulk(self._es, actions, refresh=refresh) - - def scan(self, **kwargs): - return helpers.scan(self._es, **kwargs) - - def search(self, **kwargs): - return self._es.search(**kwargs) - - def field_caps(self, **kwargs): - return self._es.field_caps(**kwargs) - - def count(self, **kwargs): - count_json = self._es.count(**kwargs) - return count_json["count"] - - def perform_request(self, method, url, headers=None, params=None, body=None): - return self._es.transport.perform_request(method, url, headers, params, body) diff --git a/eland/common.py b/eland/common.py index 90b055d..0e86ff5 100644 --- a/eland/common.py +++ b/eland/common.py @@ -15,9 +15,10 @@ # Default number of rows displayed (different to pandas where ALL could be displayed) import warnings from enum import Enum -from typing import Union +from typing import Union, List, Tuple import pandas as pd +from elasticsearch import Elasticsearch DEFAULT_NUM_ROWS_DISPLAYED = 60 @@ -259,3 +260,11 @@ def elasticsearch_date_to_pandas_date( ) # TODO investigate how we could generate this just once for a bulk read. return pd.to_datetime(value) + + +def ensure_es_client( + es_client: Union[str, List[str], Tuple[str, ...], Elasticsearch] +) -> Elasticsearch: + if not isinstance(es_client, Elasticsearch): + return Elasticsearch(es_client) + return es_client diff --git a/eland/dataframe.py b/eland/dataframe.py index c434567..38d623f 100644 --- a/eland/dataframe.py +++ b/eland/dataframe.py @@ -46,8 +46,7 @@ class DataFrame(NDFrame): ---------- client: Elasticsearch client argument(s) (e.g. 'localhost:9200') - elasticsearch-py parameters or - - elasticsearch-py instance or - - eland.Client instance + - elasticsearch-py instance index_pattern: str Elasticsearch index pattern. This can contain wildcards. (e.g. 'flights') columns: list of str, optional diff --git a/eland/field_mappings.py b/eland/field_mappings.py index 2061a0a..fbe50f2 100644 --- a/eland/field_mappings.py +++ b/eland/field_mappings.py @@ -67,7 +67,7 @@ class FieldMappings: """ Parameters ---------- - client: eland.Client + client: elasticsearch.Elasticsearch Elasticsearch client index_pattern: str @@ -82,7 +82,7 @@ class FieldMappings: f"or index_pattern {client} {index_pattern}", ) - get_mapping = client.get_mapping(index=index_pattern) + get_mapping = client.indices.get_mapping(index=index_pattern) # Get all fields (including all nested) and then all field_caps all_fields = FieldMappings._extract_fields_from_mapping(get_mapping) diff --git a/eland/ml/imported_ml_model.py b/eland/ml/imported_ml_model.py index 5ae5a39..96ada49 100644 --- a/eland/ml/imported_ml_model.py +++ b/eland/ml/imported_ml_model.py @@ -42,8 +42,7 @@ class ImportedMLModel(MLModel): ---------- es_client: Elasticsearch client argument(s) - elasticsearch-py parameters or - - elasticsearch-py instance or - - eland.Client instance + - elasticsearch-py instance model_id: str The unique identifier of the trained inference model in Elasticsearch. @@ -160,9 +159,8 @@ class ImportedMLModel(MLModel): serialized_model = str(serializer.serialize_and_compress_model())[ 2:-1 ] # remove `b` and str quotes - self._client.perform_request( - "PUT", - "/_ml/inference/" + self._model_id, + self._client.ml.put_trained_model( + model_id=self._model_id, body={ "input": {"field_names": feature_names}, "compressed_definition": serialized_model, @@ -229,9 +227,7 @@ class ImportedMLModel(MLModel): else: raise NotImplementedError(f"Prediction for type {type(X)}, not supported") - results = self._client.perform_request( - "POST", - "/_ingest/pipeline/_simulate", + results = self._client.ingest.simulate( body={ "pipeline": { "processors": [ @@ -245,7 +241,7 @@ class ImportedMLModel(MLModel): ] }, "docs": docs, - }, + } ) y = [ diff --git a/eland/ml/ml_model.py b/eland/ml/ml_model.py index 9a4cbc4..900ceb7 100644 --- a/eland/ml/ml_model.py +++ b/eland/ml/ml_model.py @@ -13,8 +13,7 @@ # limitations under the License. import elasticsearch - -from eland import Client +from eland.common import ensure_es_client class MLModel: @@ -37,13 +36,12 @@ class MLModel: ---------- es_client: Elasticsearch client argument(s) - elasticsearch-py parameters or - - elasticsearch-py instance or - - eland.Client instance + - elasticsearch-py instance model_id: str The unique identifier of the trained inference model in Elasticsearch. """ - self._client = Client(es_client) + self._client = ensure_es_client(es_client) self._model_id = model_id def delete_model(self): @@ -53,6 +51,6 @@ class MLModel: If model doesn't exist, ignore failure. """ try: - self._client.perform_request("DELETE", "/_ml/inference/" + self._model_id) - except elasticsearch.exceptions.NotFoundError: + self._client.ml.delete_trained_model(model_id=self._model_id) + except elasticsearch.NotFoundError: pass diff --git a/eland/ndframe.py b/eland/ndframe.py index d1b995a..3d8b646 100644 --- a/eland/ndframe.py +++ b/eland/ndframe.py @@ -57,7 +57,7 @@ class NDFrame(ABC): Parameters ---------- - client : eland.Client + client : elasticsearch.Elasticsearch A reference to a Elasticsearch python client """ if query_compiler is None: diff --git a/eland/operations.py b/eland/operations.py index 4810537..63f4563 100644 --- a/eland/operations.py +++ b/eland/operations.py @@ -16,6 +16,7 @@ import warnings import pandas as pd from pandas.core.dtypes.common import is_datetime_or_timedelta_dtype +from elasticsearch.helpers import scan from eland import ( Index, @@ -114,7 +115,7 @@ class Operations: field_exists_count = query_compiler._client.count( index=query_compiler._index_pattern, body=body.to_count_body() - ) + )["count"] counts[field] = field_exists_count return pd.Series(data=counts, index=fields) @@ -702,8 +703,10 @@ class Operations: raise else: is_scan = True - es_results = query_compiler._client.scan( - index=query_compiler._index_pattern, query=body + es_results = scan( + client=query_compiler._client, + index=query_compiler._index_pattern, + query=body, ) # create post sort if sort_params is not None: @@ -739,7 +742,7 @@ class Operations: return query_compiler._client.count( index=query_compiler._index_pattern, body=body.to_count_body() - ) + )["count"] def _validate_index_operation(self, query_compiler, items): if not isinstance(items, list): @@ -772,7 +775,7 @@ class Operations: return query_compiler._client.count( index=query_compiler._index_pattern, body=body.to_count_body() - ) + )["count"] def drop_index_values(self, query_compiler, field, items): self._validate_index_operation(query_compiler, items) diff --git a/eland/query_compiler.py b/eland/query_compiler.py index 6a7aae0..b32c4e6 100644 --- a/eland/query_compiler.py +++ b/eland/query_compiler.py @@ -18,7 +18,6 @@ import numpy as np import pandas as pd from eland import ( - Client, DEFAULT_PROGRESS_REPORTING_NUM_ROWS, elasticsearch_date_to_pandas_date, ) @@ -26,6 +25,7 @@ from eland import FieldMappings from eland import Index from eland import Operations from eland.filter import QueryFilter +from eland.common import ensure_es_client class QueryCompiler: @@ -67,13 +67,13 @@ class QueryCompiler: ): # Implement copy as we don't deep copy the client if to_copy is not None: - self._client = Client(to_copy._client) + self._client = to_copy._client self._index_pattern = to_copy._index_pattern self._index = Index(self, to_copy._index.index_field) self._operations = copy.deepcopy(to_copy._operations) self._mappings = copy.deepcopy(to_copy._mappings) else: - self._client = Client(client) + self._client = ensure_es_client(client) self._index_pattern = index_pattern # Get and persist mappings, this allows us to correctly # map returned types from Elasticsearch to pandas datatypes @@ -519,10 +519,10 @@ class QueryCompiler: if not isinstance(right, QueryCompiler): raise TypeError(f"Incompatible types {type(self)} != {type(right)}") - if self._client._es != right._client._es: + if self._client != right._client: raise ValueError( f"Can not perform arithmetic operations across different clients" - f"{self._client._es} != {right._client._es}" + f"{self._client} != {right._client}" ) if self._index.index_field != right._index.index_field: diff --git a/eland/series.py b/eland/series.py index 9b597a6..7f9ce02 100644 --- a/eland/series.py +++ b/eland/series.py @@ -63,7 +63,7 @@ class Series(NDFrame): Parameters ---------- - client : eland.Client + client : elasticsearch.Elasticsearch A reference to a Elasticsearch python client index_pattern : str @@ -1046,7 +1046,7 @@ class Series(NDFrame): return a op b a & b == Series - a & b must share same eland.Client, index_pattern and index_field + a & b must share same Elasticsearch client, index_pattern and index_field a == Series, b == numeric or string Naming of the resulting Series diff --git a/eland/tests/client/__init__.py b/eland/tests/client/__init__.py deleted file mode 100644 index 68cb7e8..0000000 --- a/eland/tests/client/__init__.py +++ /dev/null @@ -1,13 +0,0 @@ -# Copyright 2019 Elasticsearch BV -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. diff --git a/eland/tests/client/test_perform_request_pytest.py b/eland/tests/client/test_perform_request_pytest.py deleted file mode 100644 index 2382d1b..0000000 --- a/eland/tests/client/test_perform_request_pytest.py +++ /dev/null @@ -1,40 +0,0 @@ -# Copyright 2019 Elasticsearch BV -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -# File called _pytest for PyCharm compatability -import elasticsearch -import pytest - -import eland as ed -from eland.tests import ES_TEST_CLIENT -from eland.tests.common import TestData - - -class TestClientEq(TestData): - def test_perform_request(self): - client = ed.Client(ES_TEST_CLIENT) - - response = client.perform_request("GET", "/_cat/indices/flights") - - # yellow open flights TNUv0iysQSi7F-N5ykWfWQ 1 1 13059 0 5.7mb 5.7mb - tokens = response.split(" ") - - assert tokens[2] == "flights" - assert tokens[6] == "13059" - - def test_bad_perform_request(self): - client = ed.Client(ES_TEST_CLIENT) - - with pytest.raises(elasticsearch.exceptions.NotFoundError): - client.perform_request("GET", "/_cat/indices/non_existant_index") diff --git a/eland/tests/field_mappings/test_aggregatables_pytest.py b/eland/tests/field_mappings/test_aggregatables_pytest.py index 66ff563..7de6566 100644 --- a/eland/tests/field_mappings/test_aggregatables_pytest.py +++ b/eland/tests/field_mappings/test_aggregatables_pytest.py @@ -24,7 +24,7 @@ class TestAggregatables(TestData): @pytest.mark.filterwarnings("ignore:Aggregations not supported") def test_ecommerce_all_aggregatables(self): ed_field_mappings = ed.FieldMappings( - client=ed.Client(ES_TEST_CLIENT), index_pattern=ECOMMERCE_INDEX_NAME + client=ES_TEST_CLIENT, index_pattern=ECOMMERCE_INDEX_NAME ) aggregatables = ed_field_mappings.aggregatable_field_names() @@ -89,7 +89,7 @@ class TestAggregatables(TestData): } ed_field_mappings = ed.FieldMappings( - client=ed.Client(ES_TEST_CLIENT), + client=ES_TEST_CLIENT, index_pattern=ECOMMERCE_INDEX_NAME, display_names=expected.values(), ) @@ -100,14 +100,14 @@ class TestAggregatables(TestData): def test_ecommerce_single_aggregatable_field(self): ed_field_mappings = ed.FieldMappings( - client=ed.Client(ES_TEST_CLIENT), index_pattern=ECOMMERCE_INDEX_NAME + client=ES_TEST_CLIENT, index_pattern=ECOMMERCE_INDEX_NAME ) assert "user" == ed_field_mappings.aggregatable_field_name("user") def test_ecommerce_single_keyword_aggregatable_field(self): ed_field_mappings = ed.FieldMappings( - client=ed.Client(ES_TEST_CLIENT), index_pattern=ECOMMERCE_INDEX_NAME + client=ES_TEST_CLIENT, index_pattern=ECOMMERCE_INDEX_NAME ) assert ( @@ -117,7 +117,7 @@ class TestAggregatables(TestData): def test_ecommerce_single_non_existant_field(self): ed_field_mappings = ed.FieldMappings( - client=ed.Client(ES_TEST_CLIENT), index_pattern=ECOMMERCE_INDEX_NAME + client=ES_TEST_CLIENT, index_pattern=ECOMMERCE_INDEX_NAME ) with pytest.raises(KeyError): @@ -126,7 +126,7 @@ class TestAggregatables(TestData): @pytest.mark.filterwarnings("ignore:Aggregations not supported") def test_ecommerce_single_non_aggregatable_field(self): ed_field_mappings = ed.FieldMappings( - client=ed.Client(ES_TEST_CLIENT), index_pattern=ECOMMERCE_INDEX_NAME + client=ES_TEST_CLIENT, index_pattern=ECOMMERCE_INDEX_NAME ) assert None is ed_field_mappings.aggregatable_field_name("customer_gender") diff --git a/eland/tests/field_mappings/test_datetime_pytest.py b/eland/tests/field_mappings/test_datetime_pytest.py index 7abea91..b6fcae1 100644 --- a/eland/tests/field_mappings/test_datetime_pytest.py +++ b/eland/tests/field_mappings/test_datetime_pytest.py @@ -64,7 +64,7 @@ class TestDateTime(TestData): def test_all_formats(self): ed_field_mappings = ed.FieldMappings( - client=ed.Client(ES_TEST_CLIENT), index_pattern=self.time_index_name + client=ES_TEST_CLIENT, index_pattern=self.time_index_name ) # do a rename so display_name for a field is different to es_field_name diff --git a/eland/tests/field_mappings/test_display_names_pytest.py b/eland/tests/field_mappings/test_display_names_pytest.py index e046d6b..04b0659 100644 --- a/eland/tests/field_mappings/test_display_names_pytest.py +++ b/eland/tests/field_mappings/test_display_names_pytest.py @@ -23,7 +23,7 @@ from eland.tests.common import TestData class TestDisplayNames(TestData): def test_init_all_fields(self): field_mappings = ed.FieldMappings( - client=ed.Client(ES_TEST_CLIENT), index_pattern=FLIGHTS_INDEX_NAME + client=ES_TEST_CLIENT, index_pattern=FLIGHTS_INDEX_NAME ) expected = self.pd_flights().columns.to_list() @@ -34,7 +34,7 @@ class TestDisplayNames(TestData): expected = ["timestamp", "DestWeather", "DistanceKilometers", "AvgTicketPrice"] field_mappings = ed.FieldMappings( - client=ed.Client(ES_TEST_CLIENT), + client=ES_TEST_CLIENT, index_pattern=FLIGHTS_INDEX_NAME, display_names=expected, ) @@ -51,7 +51,7 @@ class TestDisplayNames(TestData): ] field_mappings = ed.FieldMappings( - client=ed.Client(ES_TEST_CLIENT), index_pattern=FLIGHTS_INDEX_NAME + client=ES_TEST_CLIENT, index_pattern=FLIGHTS_INDEX_NAME ) field_mappings.display_names = expected @@ -75,7 +75,7 @@ class TestDisplayNames(TestData): ] field_mappings = ed.FieldMappings( - client=ed.Client(ES_TEST_CLIENT), index_pattern=FLIGHTS_INDEX_NAME + client=ES_TEST_CLIENT, index_pattern=FLIGHTS_INDEX_NAME ) with pytest.raises(KeyError): @@ -87,7 +87,7 @@ class TestDisplayNames(TestData): def test_invalid_list_type_display_names(self): field_mappings = ed.FieldMappings( - client=ed.Client(ES_TEST_CLIENT), index_pattern=FLIGHTS_INDEX_NAME + client=ES_TEST_CLIENT, index_pattern=FLIGHTS_INDEX_NAME ) # not a list like object diff --git a/eland/tests/field_mappings/test_dtypes_pytest.py b/eland/tests/field_mappings/test_dtypes_pytest.py index 958f943..03c38c8 100644 --- a/eland/tests/field_mappings/test_dtypes_pytest.py +++ b/eland/tests/field_mappings/test_dtypes_pytest.py @@ -23,7 +23,7 @@ from eland.tests.common import TestData class TestDTypes(TestData): def test_all_fields(self): field_mappings = ed.FieldMappings( - client=ed.Client(ES_TEST_CLIENT), index_pattern=FLIGHTS_INDEX_NAME + client=ES_TEST_CLIENT, index_pattern=FLIGHTS_INDEX_NAME ) pd_flights = self.pd_flights() @@ -34,7 +34,7 @@ class TestDTypes(TestData): expected = ["timestamp", "DestWeather", "DistanceKilometers", "AvgTicketPrice"] field_mappings = ed.FieldMappings( - client=ed.Client(ES_TEST_CLIENT), + client=ES_TEST_CLIENT, index_pattern=FLIGHTS_INDEX_NAME, display_names=expected, ) diff --git a/eland/tests/field_mappings/test_field_name_pd_dtype_pytest.py b/eland/tests/field_mappings/test_field_name_pd_dtype_pytest.py index 9413d02..241c107 100644 --- a/eland/tests/field_mappings/test_field_name_pd_dtype_pytest.py +++ b/eland/tests/field_mappings/test_field_name_pd_dtype_pytest.py @@ -25,7 +25,7 @@ from eland.tests.common import TestData class TestFieldNamePDDType(TestData): def test_all_formats(self): ed_field_mappings = ed.FieldMappings( - client=ed.Client(ES_TEST_CLIENT), index_pattern=FLIGHTS_INDEX_NAME + client=ES_TEST_CLIENT, index_pattern=FLIGHTS_INDEX_NAME ) pd_flights = self.pd_flights() @@ -39,7 +39,7 @@ class TestFieldNamePDDType(TestData): def test_non_existant(self): ed_field_mappings = ed.FieldMappings( - client=ed.Client(ES_TEST_CLIENT), index_pattern=FLIGHTS_INDEX_NAME + client=ES_TEST_CLIENT, index_pattern=FLIGHTS_INDEX_NAME ) with pytest.raises(KeyError): diff --git a/eland/tests/field_mappings/test_get_field_names_pytest.py b/eland/tests/field_mappings/test_get_field_names_pytest.py index 8bd862a..aa41387 100644 --- a/eland/tests/field_mappings/test_get_field_names_pytest.py +++ b/eland/tests/field_mappings/test_get_field_names_pytest.py @@ -25,7 +25,7 @@ from eland.tests.common import TestData class TestGetFieldNames(TestData): def test_get_field_names_all(self): ed_field_mappings = ed.FieldMappings( - client=ed.Client(ES_TEST_CLIENT), index_pattern=FLIGHTS_INDEX_NAME + client=ES_TEST_CLIENT, index_pattern=FLIGHTS_INDEX_NAME ) pd_flights = self.pd_flights() @@ -38,7 +38,7 @@ class TestGetFieldNames(TestData): def test_get_field_names_selected(self): expected = ["Carrier", "AvgTicketPrice"] ed_field_mappings = ed.FieldMappings( - client=ed.Client(ES_TEST_CLIENT), + client=ES_TEST_CLIENT, index_pattern=FLIGHTS_INDEX_NAME, display_names=expected, ) @@ -53,7 +53,7 @@ class TestGetFieldNames(TestData): def test_get_field_names_scripted(self): expected = ["Carrier", "AvgTicketPrice"] ed_field_mappings = ed.FieldMappings( - client=ed.Client(ES_TEST_CLIENT), + client=ES_TEST_CLIENT, index_pattern=FLIGHTS_INDEX_NAME, display_names=expected, ) diff --git a/eland/tests/field_mappings/test_metric_source_fields_pytest.py b/eland/tests/field_mappings/test_metric_source_fields_pytest.py index 3c9d19e..90b1e38 100644 --- a/eland/tests/field_mappings/test_metric_source_fields_pytest.py +++ b/eland/tests/field_mappings/test_metric_source_fields_pytest.py @@ -24,7 +24,7 @@ from eland.tests.common import TestData class TestMetricSourceFields(TestData): def test_flights_all_metric_source_fields(self): ed_field_mappings = ed.FieldMappings( - client=ed.Client(ES_TEST_CLIENT), index_pattern=FLIGHTS_INDEX_NAME + client=ES_TEST_CLIENT, index_pattern=FLIGHTS_INDEX_NAME ) pd_flights = self.pd_flights() @@ -38,7 +38,7 @@ class TestMetricSourceFields(TestData): def test_flights_all_metric_source_fields_and_bool(self): ed_field_mappings = ed.FieldMappings( - client=ed.Client(ES_TEST_CLIENT), index_pattern=FLIGHTS_INDEX_NAME + client=ES_TEST_CLIENT, index_pattern=FLIGHTS_INDEX_NAME ) pd_flights = self.pd_flights() @@ -54,7 +54,7 @@ class TestMetricSourceFields(TestData): def test_flights_all_metric_source_fields_bool_and_timestamp(self): ed_field_mappings = ed.FieldMappings( - client=ed.Client(ES_TEST_CLIENT), index_pattern=FLIGHTS_INDEX_NAME + client=ES_TEST_CLIENT, index_pattern=FLIGHTS_INDEX_NAME ) pd_flights = self.pd_flights() @@ -87,7 +87,7 @@ class TestMetricSourceFields(TestData): user object """ ed_field_mappings = ed.FieldMappings( - client=ed.Client(ES_TEST_CLIENT), + client=ES_TEST_CLIENT, index_pattern=ECOMMERCE_INDEX_NAME, display_names=field_names, ) @@ -120,7 +120,7 @@ class TestMetricSourceFields(TestData): user object """ ed_field_mappings = ed.FieldMappings( - client=ed.Client(ES_TEST_CLIENT), + client=ES_TEST_CLIENT, index_pattern=ECOMMERCE_INDEX_NAME, display_names=field_names, ) @@ -143,7 +143,7 @@ class TestMetricSourceFields(TestData): taxless_total_price float64 """ ed_field_mappings = ed.FieldMappings( - client=ed.Client(ES_TEST_CLIENT), + client=ES_TEST_CLIENT, index_pattern=ECOMMERCE_INDEX_NAME, display_names=field_names, ) diff --git a/eland/tests/field_mappings/test_rename_pytest.py b/eland/tests/field_mappings/test_rename_pytest.py index 07efaed..504e162 100644 --- a/eland/tests/field_mappings/test_rename_pytest.py +++ b/eland/tests/field_mappings/test_rename_pytest.py @@ -22,7 +22,7 @@ from eland.tests.common import TestData class TestRename(TestData): def test_single_rename(self): ed_field_mappings = ed.FieldMappings( - client=ed.Client(ES_TEST_CLIENT), index_pattern=FLIGHTS_INDEX_NAME + client=ES_TEST_CLIENT, index_pattern=FLIGHTS_INDEX_NAME ) pd_flights_column_series = self.pd_flights().columns.to_series() @@ -47,7 +47,7 @@ class TestRename(TestData): def test_non_exists_rename(self): ed_field_mappings = ed.FieldMappings( - client=ed.Client(ES_TEST_CLIENT), index_pattern=FLIGHTS_INDEX_NAME + client=ES_TEST_CLIENT, index_pattern=FLIGHTS_INDEX_NAME ) pd_flights_column_series = self.pd_flights().columns.to_series() @@ -71,7 +71,7 @@ class TestRename(TestData): def test_exists_and_non_exists_rename(self): ed_field_mappings = ed.FieldMappings( - client=ed.Client(ES_TEST_CLIENT), index_pattern=FLIGHTS_INDEX_NAME + client=ES_TEST_CLIENT, index_pattern=FLIGHTS_INDEX_NAME ) pd_flights_column_series = self.pd_flights().columns.to_series() @@ -104,7 +104,7 @@ class TestRename(TestData): def test_multi_rename(self): ed_field_mappings = ed.FieldMappings( - client=ed.Client(ES_TEST_CLIENT), index_pattern=FLIGHTS_INDEX_NAME + client=ES_TEST_CLIENT, index_pattern=FLIGHTS_INDEX_NAME ) pd_flights_column_series = self.pd_flights().columns.to_series() diff --git a/eland/tests/field_mappings/test_scripted_fields_pytest.py b/eland/tests/field_mappings/test_scripted_fields_pytest.py index 54c2760..4b2484e 100644 --- a/eland/tests/field_mappings/test_scripted_fields_pytest.py +++ b/eland/tests/field_mappings/test_scripted_fields_pytest.py @@ -25,7 +25,7 @@ from eland.tests.common import TestData class TestScriptedFields(TestData): def test_add_new_scripted_field(self): ed_field_mappings = ed.FieldMappings( - client=ed.Client(ES_TEST_CLIENT), index_pattern=FLIGHTS_INDEX_NAME + client=ES_TEST_CLIENT, index_pattern=FLIGHTS_INDEX_NAME ) ed_field_mappings.add_scripted_field( @@ -44,7 +44,7 @@ class TestScriptedFields(TestData): def test_add_duplicate_scripted_field(self): ed_field_mappings = ed.FieldMappings( - client=ed.Client(ES_TEST_CLIENT), index_pattern=FLIGHTS_INDEX_NAME + client=ES_TEST_CLIENT, index_pattern=FLIGHTS_INDEX_NAME ) ed_field_mappings.add_scripted_field( diff --git a/eland/tests/setup_tests.py b/eland/tests/setup_tests.py index b1b5d52..c2e19c8 100644 --- a/eland/tests/setup_tests.py +++ b/eland/tests/setup_tests.py @@ -14,7 +14,6 @@ import pandas as pd from elasticsearch import helpers -from elasticsearch.client import ClusterClient from eland.tests import ( FLIGHTS_FILE_NAME, @@ -85,9 +84,8 @@ def _setup_data(es): def _update_max_compilations_limit(es, limit="10000/1m"): print("Updating script.max_compilations_rate to ", limit) - cluster_client = ClusterClient(es) body = {"transient": {"script.max_compilations_rate": limit}} - cluster_client.put_settings(body=body) + es.cluster.put_settings(body=body) def _setup_test_mappings(es): diff --git a/eland/utils.py b/eland/utils.py index c83e410..6a05a26 100644 --- a/eland/utils.py +++ b/eland/utils.py @@ -17,9 +17,11 @@ import csv import pandas as pd from pandas.io.parsers import _c_parser_defaults -from eland import Client, DEFAULT_CHUNK_SIZE +from eland import DEFAULT_CHUNK_SIZE from eland import DataFrame from eland import FieldMappings +from eland.common import ensure_es_client +from elasticsearch.helpers import bulk def read_es(es_client, es_index_pattern): @@ -32,8 +34,7 @@ def read_es(es_client, es_index_pattern): ---------- es_client: Elasticsearch client argument(s) - elasticsearch-py parameters or - - elasticsearch-py instance or - - eland.Client instance + - elasticsearch-py instance es_index_pattern: str Elasticsearch index pattern @@ -69,8 +70,7 @@ def pandas_to_eland( ---------- es_client: Elasticsearch client argument(s) - elasticsearch-py parameters or - - elasticsearch-py instance or - - eland.Client instance + - elasticsearch-py instance es_dest_index: str Name of Elasticsearch index to be appended to es_if_exists : {'fail', 'replace', 'append'}, default 'fail' @@ -164,12 +164,11 @@ def pandas_to_eland( if chunksize is None: chunksize = DEFAULT_CHUNK_SIZE - client = Client(es_client) - mapping = FieldMappings._generate_es_mappings(pd_df, es_geo_points) + es_client = ensure_es_client(es_client) # If table exists, check if_exists parameter - if client.index_exists(index=es_dest_index): + if es_client.indices.exists(index=es_dest_index): if es_if_exists == "fail": raise ValueError( f"Could not create the index [{es_dest_index}] because it " @@ -178,12 +177,12 @@ def pandas_to_eland( f"'append' or 'replace' data." ) elif es_if_exists == "replace": - client.index_delete(index=es_dest_index) - client.index_create(index=es_dest_index, body=mapping) + es_client.indices.delete(index=es_dest_index) + es_client.indices.create(index=es_dest_index, body=mapping) # elif if_exists == "append": # TODO validate mapping are compatible else: - client.index_create(index=es_dest_index, body=mapping) + es_client.indices.create(index=es_dest_index, body=mapping) # Now add data actions = [] @@ -208,14 +207,11 @@ def pandas_to_eland( n = n + 1 if n % chunksize == 0: - client.bulk(actions, refresh=es_refresh) + bulk(client=es_client, actions=actions, refresh=es_refresh) actions = [] - client.bulk(actions, refresh=es_refresh) - - ed_df = DataFrame(client, es_dest_index) - - return ed_df + bulk(client=es_client, actions=actions, refresh=es_refresh) + return DataFrame(es_client, es_dest_index) def eland_to_pandas(ed_df, show_progress=False): @@ -357,8 +353,7 @@ def read_csv( ---------- es_client: Elasticsearch client argument(s) - elasticsearch-py parameters or - - elasticsearch-py instance or - - eland.Client instance + - elasticsearch-py instance es_dest_index: str Name of Elasticsearch index to be appended to es_if_exists : {'fail', 'replace', 'append'}, default 'fail' @@ -490,8 +485,6 @@ def read_csv( if chunksize is None: kwds.update(chunksize=DEFAULT_CHUNK_SIZE) - client = Client(es_client) - # read csv in chunks to pandas DataFrame and dump to eland DataFrame (and Elasticsearch) reader = pd.read_csv(filepath_or_buffer, **kwds) @@ -500,7 +493,7 @@ def read_csv( if first_write: pandas_to_eland( chunk, - client, + es_client, es_dest_index, es_if_exists=es_if_exists, chunksize=chunksize, @@ -512,7 +505,7 @@ def read_csv( else: pandas_to_eland( chunk, - client, + es_client, es_dest_index, es_if_exists="append", chunksize=chunksize, @@ -522,6 +515,4 @@ def read_csv( ) # Now create an eland.DataFrame that references the new index - ed_df = DataFrame(client, es_dest_index) - - return ed_df + return DataFrame(es_client, es_dest_index)