Remove eland.Client, use Elasticsearch directly

This commit is contained in:
Seth Michael Larson 2020-04-06 07:25:25 -05:00 committed by GitHub
parent 29af76101e
commit 064d43b9ef
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
24 changed files with 88 additions and 209 deletions

View File

@ -25,7 +25,6 @@ from eland._version import (
__maintainer_email__,
)
from eland.common import *
from eland.client import *
from eland.filter import *
from eland.index import *
from eland.field_mappings import *

View File

@ -1,61 +0,0 @@
# Copyright 2019 Elasticsearch BV
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from elasticsearch import Elasticsearch
from elasticsearch import helpers
class Client:
"""
eland client - implemented as facade to control access to Elasticsearch methods
"""
def __init__(self, es=None):
if isinstance(es, Elasticsearch):
self._es = es
elif isinstance(es, Client):
self._es = es._es
else:
self._es = Elasticsearch(es)
def index_create(self, **kwargs):
return self._es.indices.create(**kwargs)
def index_delete(self, **kwargs):
return self._es.indices.delete(**kwargs)
def index_exists(self, **kwargs):
return self._es.indices.exists(**kwargs)
def get_mapping(self, **kwargs):
return self._es.indices.get_mapping(**kwargs)
def bulk(self, actions, refresh=False):
return helpers.bulk(self._es, actions, refresh=refresh)
def scan(self, **kwargs):
return helpers.scan(self._es, **kwargs)
def search(self, **kwargs):
return self._es.search(**kwargs)
def field_caps(self, **kwargs):
return self._es.field_caps(**kwargs)
def count(self, **kwargs):
count_json = self._es.count(**kwargs)
return count_json["count"]
def perform_request(self, method, url, headers=None, params=None, body=None):
return self._es.transport.perform_request(method, url, headers, params, body)

View File

@ -15,9 +15,10 @@
# Default number of rows displayed (different to pandas where ALL could be displayed)
import warnings
from enum import Enum
from typing import Union
from typing import Union, List, Tuple
import pandas as pd
from elasticsearch import Elasticsearch
DEFAULT_NUM_ROWS_DISPLAYED = 60
@ -259,3 +260,11 @@ def elasticsearch_date_to_pandas_date(
)
# TODO investigate how we could generate this just once for a bulk read.
return pd.to_datetime(value)
def ensure_es_client(
es_client: Union[str, List[str], Tuple[str, ...], Elasticsearch]
) -> Elasticsearch:
if not isinstance(es_client, Elasticsearch):
return Elasticsearch(es_client)
return es_client

View File

@ -46,8 +46,7 @@ class DataFrame(NDFrame):
----------
client: Elasticsearch client argument(s) (e.g. 'localhost:9200')
- elasticsearch-py parameters or
- elasticsearch-py instance or
- eland.Client instance
- elasticsearch-py instance
index_pattern: str
Elasticsearch index pattern. This can contain wildcards. (e.g. 'flights')
columns: list of str, optional

View File

@ -67,7 +67,7 @@ class FieldMappings:
"""
Parameters
----------
client: eland.Client
client: elasticsearch.Elasticsearch
Elasticsearch client
index_pattern: str
@ -82,7 +82,7 @@ class FieldMappings:
f"or index_pattern {client} {index_pattern}",
)
get_mapping = client.get_mapping(index=index_pattern)
get_mapping = client.indices.get_mapping(index=index_pattern)
# Get all fields (including all nested) and then all field_caps
all_fields = FieldMappings._extract_fields_from_mapping(get_mapping)

View File

@ -42,8 +42,7 @@ class ImportedMLModel(MLModel):
----------
es_client: Elasticsearch client argument(s)
- elasticsearch-py parameters or
- elasticsearch-py instance or
- eland.Client instance
- elasticsearch-py instance
model_id: str
The unique identifier of the trained inference model in Elasticsearch.
@ -160,9 +159,8 @@ class ImportedMLModel(MLModel):
serialized_model = str(serializer.serialize_and_compress_model())[
2:-1
] # remove `b` and str quotes
self._client.perform_request(
"PUT",
"/_ml/inference/" + self._model_id,
self._client.ml.put_trained_model(
model_id=self._model_id,
body={
"input": {"field_names": feature_names},
"compressed_definition": serialized_model,
@ -229,9 +227,7 @@ class ImportedMLModel(MLModel):
else:
raise NotImplementedError(f"Prediction for type {type(X)}, not supported")
results = self._client.perform_request(
"POST",
"/_ingest/pipeline/_simulate",
results = self._client.ingest.simulate(
body={
"pipeline": {
"processors": [
@ -245,7 +241,7 @@ class ImportedMLModel(MLModel):
]
},
"docs": docs,
},
}
)
y = [

View File

@ -13,8 +13,7 @@
# limitations under the License.
import elasticsearch
from eland import Client
from eland.common import ensure_es_client
class MLModel:
@ -37,13 +36,12 @@ class MLModel:
----------
es_client: Elasticsearch client argument(s)
- elasticsearch-py parameters or
- elasticsearch-py instance or
- eland.Client instance
- elasticsearch-py instance
model_id: str
The unique identifier of the trained inference model in Elasticsearch.
"""
self._client = Client(es_client)
self._client = ensure_es_client(es_client)
self._model_id = model_id
def delete_model(self):
@ -53,6 +51,6 @@ class MLModel:
If model doesn't exist, ignore failure.
"""
try:
self._client.perform_request("DELETE", "/_ml/inference/" + self._model_id)
except elasticsearch.exceptions.NotFoundError:
self._client.ml.delete_trained_model(model_id=self._model_id)
except elasticsearch.NotFoundError:
pass

View File

@ -57,7 +57,7 @@ class NDFrame(ABC):
Parameters
----------
client : eland.Client
client : elasticsearch.Elasticsearch
A reference to a Elasticsearch python client
"""
if query_compiler is None:

View File

@ -16,6 +16,7 @@ import warnings
import pandas as pd
from pandas.core.dtypes.common import is_datetime_or_timedelta_dtype
from elasticsearch.helpers import scan
from eland import (
Index,
@ -114,7 +115,7 @@ class Operations:
field_exists_count = query_compiler._client.count(
index=query_compiler._index_pattern, body=body.to_count_body()
)
)["count"]
counts[field] = field_exists_count
return pd.Series(data=counts, index=fields)
@ -702,8 +703,10 @@ class Operations:
raise
else:
is_scan = True
es_results = query_compiler._client.scan(
index=query_compiler._index_pattern, query=body
es_results = scan(
client=query_compiler._client,
index=query_compiler._index_pattern,
query=body,
)
# create post sort
if sort_params is not None:
@ -739,7 +742,7 @@ class Operations:
return query_compiler._client.count(
index=query_compiler._index_pattern, body=body.to_count_body()
)
)["count"]
def _validate_index_operation(self, query_compiler, items):
if not isinstance(items, list):
@ -772,7 +775,7 @@ class Operations:
return query_compiler._client.count(
index=query_compiler._index_pattern, body=body.to_count_body()
)
)["count"]
def drop_index_values(self, query_compiler, field, items):
self._validate_index_operation(query_compiler, items)

View File

@ -18,7 +18,6 @@ import numpy as np
import pandas as pd
from eland import (
Client,
DEFAULT_PROGRESS_REPORTING_NUM_ROWS,
elasticsearch_date_to_pandas_date,
)
@ -26,6 +25,7 @@ from eland import FieldMappings
from eland import Index
from eland import Operations
from eland.filter import QueryFilter
from eland.common import ensure_es_client
class QueryCompiler:
@ -67,13 +67,13 @@ class QueryCompiler:
):
# Implement copy as we don't deep copy the client
if to_copy is not None:
self._client = Client(to_copy._client)
self._client = to_copy._client
self._index_pattern = to_copy._index_pattern
self._index = Index(self, to_copy._index.index_field)
self._operations = copy.deepcopy(to_copy._operations)
self._mappings = copy.deepcopy(to_copy._mappings)
else:
self._client = Client(client)
self._client = ensure_es_client(client)
self._index_pattern = index_pattern
# Get and persist mappings, this allows us to correctly
# map returned types from Elasticsearch to pandas datatypes
@ -519,10 +519,10 @@ class QueryCompiler:
if not isinstance(right, QueryCompiler):
raise TypeError(f"Incompatible types {type(self)} != {type(right)}")
if self._client._es != right._client._es:
if self._client != right._client:
raise ValueError(
f"Can not perform arithmetic operations across different clients"
f"{self._client._es} != {right._client._es}"
f"{self._client} != {right._client}"
)
if self._index.index_field != right._index.index_field:

View File

@ -63,7 +63,7 @@ class Series(NDFrame):
Parameters
----------
client : eland.Client
client : elasticsearch.Elasticsearch
A reference to a Elasticsearch python client
index_pattern : str
@ -1046,7 +1046,7 @@ class Series(NDFrame):
return a op b
a & b == Series
a & b must share same eland.Client, index_pattern and index_field
a & b must share same Elasticsearch client, index_pattern and index_field
a == Series, b == numeric or string
Naming of the resulting Series

View File

@ -1,13 +0,0 @@
# Copyright 2019 Elasticsearch BV
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

View File

@ -1,40 +0,0 @@
# Copyright 2019 Elasticsearch BV
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# File called _pytest for PyCharm compatability
import elasticsearch
import pytest
import eland as ed
from eland.tests import ES_TEST_CLIENT
from eland.tests.common import TestData
class TestClientEq(TestData):
def test_perform_request(self):
client = ed.Client(ES_TEST_CLIENT)
response = client.perform_request("GET", "/_cat/indices/flights")
# yellow open flights TNUv0iysQSi7F-N5ykWfWQ 1 1 13059 0 5.7mb 5.7mb
tokens = response.split(" ")
assert tokens[2] == "flights"
assert tokens[6] == "13059"
def test_bad_perform_request(self):
client = ed.Client(ES_TEST_CLIENT)
with pytest.raises(elasticsearch.exceptions.NotFoundError):
client.perform_request("GET", "/_cat/indices/non_existant_index")

View File

@ -24,7 +24,7 @@ class TestAggregatables(TestData):
@pytest.mark.filterwarnings("ignore:Aggregations not supported")
def test_ecommerce_all_aggregatables(self):
ed_field_mappings = ed.FieldMappings(
client=ed.Client(ES_TEST_CLIENT), index_pattern=ECOMMERCE_INDEX_NAME
client=ES_TEST_CLIENT, index_pattern=ECOMMERCE_INDEX_NAME
)
aggregatables = ed_field_mappings.aggregatable_field_names()
@ -89,7 +89,7 @@ class TestAggregatables(TestData):
}
ed_field_mappings = ed.FieldMappings(
client=ed.Client(ES_TEST_CLIENT),
client=ES_TEST_CLIENT,
index_pattern=ECOMMERCE_INDEX_NAME,
display_names=expected.values(),
)
@ -100,14 +100,14 @@ class TestAggregatables(TestData):
def test_ecommerce_single_aggregatable_field(self):
ed_field_mappings = ed.FieldMappings(
client=ed.Client(ES_TEST_CLIENT), index_pattern=ECOMMERCE_INDEX_NAME
client=ES_TEST_CLIENT, index_pattern=ECOMMERCE_INDEX_NAME
)
assert "user" == ed_field_mappings.aggregatable_field_name("user")
def test_ecommerce_single_keyword_aggregatable_field(self):
ed_field_mappings = ed.FieldMappings(
client=ed.Client(ES_TEST_CLIENT), index_pattern=ECOMMERCE_INDEX_NAME
client=ES_TEST_CLIENT, index_pattern=ECOMMERCE_INDEX_NAME
)
assert (
@ -117,7 +117,7 @@ class TestAggregatables(TestData):
def test_ecommerce_single_non_existant_field(self):
ed_field_mappings = ed.FieldMappings(
client=ed.Client(ES_TEST_CLIENT), index_pattern=ECOMMERCE_INDEX_NAME
client=ES_TEST_CLIENT, index_pattern=ECOMMERCE_INDEX_NAME
)
with pytest.raises(KeyError):
@ -126,7 +126,7 @@ class TestAggregatables(TestData):
@pytest.mark.filterwarnings("ignore:Aggregations not supported")
def test_ecommerce_single_non_aggregatable_field(self):
ed_field_mappings = ed.FieldMappings(
client=ed.Client(ES_TEST_CLIENT), index_pattern=ECOMMERCE_INDEX_NAME
client=ES_TEST_CLIENT, index_pattern=ECOMMERCE_INDEX_NAME
)
assert None is ed_field_mappings.aggregatable_field_name("customer_gender")

View File

@ -64,7 +64,7 @@ class TestDateTime(TestData):
def test_all_formats(self):
ed_field_mappings = ed.FieldMappings(
client=ed.Client(ES_TEST_CLIENT), index_pattern=self.time_index_name
client=ES_TEST_CLIENT, index_pattern=self.time_index_name
)
# do a rename so display_name for a field is different to es_field_name

View File

@ -23,7 +23,7 @@ from eland.tests.common import TestData
class TestDisplayNames(TestData):
def test_init_all_fields(self):
field_mappings = ed.FieldMappings(
client=ed.Client(ES_TEST_CLIENT), index_pattern=FLIGHTS_INDEX_NAME
client=ES_TEST_CLIENT, index_pattern=FLIGHTS_INDEX_NAME
)
expected = self.pd_flights().columns.to_list()
@ -34,7 +34,7 @@ class TestDisplayNames(TestData):
expected = ["timestamp", "DestWeather", "DistanceKilometers", "AvgTicketPrice"]
field_mappings = ed.FieldMappings(
client=ed.Client(ES_TEST_CLIENT),
client=ES_TEST_CLIENT,
index_pattern=FLIGHTS_INDEX_NAME,
display_names=expected,
)
@ -51,7 +51,7 @@ class TestDisplayNames(TestData):
]
field_mappings = ed.FieldMappings(
client=ed.Client(ES_TEST_CLIENT), index_pattern=FLIGHTS_INDEX_NAME
client=ES_TEST_CLIENT, index_pattern=FLIGHTS_INDEX_NAME
)
field_mappings.display_names = expected
@ -75,7 +75,7 @@ class TestDisplayNames(TestData):
]
field_mappings = ed.FieldMappings(
client=ed.Client(ES_TEST_CLIENT), index_pattern=FLIGHTS_INDEX_NAME
client=ES_TEST_CLIENT, index_pattern=FLIGHTS_INDEX_NAME
)
with pytest.raises(KeyError):
@ -87,7 +87,7 @@ class TestDisplayNames(TestData):
def test_invalid_list_type_display_names(self):
field_mappings = ed.FieldMappings(
client=ed.Client(ES_TEST_CLIENT), index_pattern=FLIGHTS_INDEX_NAME
client=ES_TEST_CLIENT, index_pattern=FLIGHTS_INDEX_NAME
)
# not a list like object

View File

@ -23,7 +23,7 @@ from eland.tests.common import TestData
class TestDTypes(TestData):
def test_all_fields(self):
field_mappings = ed.FieldMappings(
client=ed.Client(ES_TEST_CLIENT), index_pattern=FLIGHTS_INDEX_NAME
client=ES_TEST_CLIENT, index_pattern=FLIGHTS_INDEX_NAME
)
pd_flights = self.pd_flights()
@ -34,7 +34,7 @@ class TestDTypes(TestData):
expected = ["timestamp", "DestWeather", "DistanceKilometers", "AvgTicketPrice"]
field_mappings = ed.FieldMappings(
client=ed.Client(ES_TEST_CLIENT),
client=ES_TEST_CLIENT,
index_pattern=FLIGHTS_INDEX_NAME,
display_names=expected,
)

View File

@ -25,7 +25,7 @@ from eland.tests.common import TestData
class TestFieldNamePDDType(TestData):
def test_all_formats(self):
ed_field_mappings = ed.FieldMappings(
client=ed.Client(ES_TEST_CLIENT), index_pattern=FLIGHTS_INDEX_NAME
client=ES_TEST_CLIENT, index_pattern=FLIGHTS_INDEX_NAME
)
pd_flights = self.pd_flights()
@ -39,7 +39,7 @@ class TestFieldNamePDDType(TestData):
def test_non_existant(self):
ed_field_mappings = ed.FieldMappings(
client=ed.Client(ES_TEST_CLIENT), index_pattern=FLIGHTS_INDEX_NAME
client=ES_TEST_CLIENT, index_pattern=FLIGHTS_INDEX_NAME
)
with pytest.raises(KeyError):

View File

@ -25,7 +25,7 @@ from eland.tests.common import TestData
class TestGetFieldNames(TestData):
def test_get_field_names_all(self):
ed_field_mappings = ed.FieldMappings(
client=ed.Client(ES_TEST_CLIENT), index_pattern=FLIGHTS_INDEX_NAME
client=ES_TEST_CLIENT, index_pattern=FLIGHTS_INDEX_NAME
)
pd_flights = self.pd_flights()
@ -38,7 +38,7 @@ class TestGetFieldNames(TestData):
def test_get_field_names_selected(self):
expected = ["Carrier", "AvgTicketPrice"]
ed_field_mappings = ed.FieldMappings(
client=ed.Client(ES_TEST_CLIENT),
client=ES_TEST_CLIENT,
index_pattern=FLIGHTS_INDEX_NAME,
display_names=expected,
)
@ -53,7 +53,7 @@ class TestGetFieldNames(TestData):
def test_get_field_names_scripted(self):
expected = ["Carrier", "AvgTicketPrice"]
ed_field_mappings = ed.FieldMappings(
client=ed.Client(ES_TEST_CLIENT),
client=ES_TEST_CLIENT,
index_pattern=FLIGHTS_INDEX_NAME,
display_names=expected,
)

View File

@ -24,7 +24,7 @@ from eland.tests.common import TestData
class TestMetricSourceFields(TestData):
def test_flights_all_metric_source_fields(self):
ed_field_mappings = ed.FieldMappings(
client=ed.Client(ES_TEST_CLIENT), index_pattern=FLIGHTS_INDEX_NAME
client=ES_TEST_CLIENT, index_pattern=FLIGHTS_INDEX_NAME
)
pd_flights = self.pd_flights()
@ -38,7 +38,7 @@ class TestMetricSourceFields(TestData):
def test_flights_all_metric_source_fields_and_bool(self):
ed_field_mappings = ed.FieldMappings(
client=ed.Client(ES_TEST_CLIENT), index_pattern=FLIGHTS_INDEX_NAME
client=ES_TEST_CLIENT, index_pattern=FLIGHTS_INDEX_NAME
)
pd_flights = self.pd_flights()
@ -54,7 +54,7 @@ class TestMetricSourceFields(TestData):
def test_flights_all_metric_source_fields_bool_and_timestamp(self):
ed_field_mappings = ed.FieldMappings(
client=ed.Client(ES_TEST_CLIENT), index_pattern=FLIGHTS_INDEX_NAME
client=ES_TEST_CLIENT, index_pattern=FLIGHTS_INDEX_NAME
)
pd_flights = self.pd_flights()
@ -87,7 +87,7 @@ class TestMetricSourceFields(TestData):
user object
"""
ed_field_mappings = ed.FieldMappings(
client=ed.Client(ES_TEST_CLIENT),
client=ES_TEST_CLIENT,
index_pattern=ECOMMERCE_INDEX_NAME,
display_names=field_names,
)
@ -120,7 +120,7 @@ class TestMetricSourceFields(TestData):
user object
"""
ed_field_mappings = ed.FieldMappings(
client=ed.Client(ES_TEST_CLIENT),
client=ES_TEST_CLIENT,
index_pattern=ECOMMERCE_INDEX_NAME,
display_names=field_names,
)
@ -143,7 +143,7 @@ class TestMetricSourceFields(TestData):
taxless_total_price float64
"""
ed_field_mappings = ed.FieldMappings(
client=ed.Client(ES_TEST_CLIENT),
client=ES_TEST_CLIENT,
index_pattern=ECOMMERCE_INDEX_NAME,
display_names=field_names,
)

View File

@ -22,7 +22,7 @@ from eland.tests.common import TestData
class TestRename(TestData):
def test_single_rename(self):
ed_field_mappings = ed.FieldMappings(
client=ed.Client(ES_TEST_CLIENT), index_pattern=FLIGHTS_INDEX_NAME
client=ES_TEST_CLIENT, index_pattern=FLIGHTS_INDEX_NAME
)
pd_flights_column_series = self.pd_flights().columns.to_series()
@ -47,7 +47,7 @@ class TestRename(TestData):
def test_non_exists_rename(self):
ed_field_mappings = ed.FieldMappings(
client=ed.Client(ES_TEST_CLIENT), index_pattern=FLIGHTS_INDEX_NAME
client=ES_TEST_CLIENT, index_pattern=FLIGHTS_INDEX_NAME
)
pd_flights_column_series = self.pd_flights().columns.to_series()
@ -71,7 +71,7 @@ class TestRename(TestData):
def test_exists_and_non_exists_rename(self):
ed_field_mappings = ed.FieldMappings(
client=ed.Client(ES_TEST_CLIENT), index_pattern=FLIGHTS_INDEX_NAME
client=ES_TEST_CLIENT, index_pattern=FLIGHTS_INDEX_NAME
)
pd_flights_column_series = self.pd_flights().columns.to_series()
@ -104,7 +104,7 @@ class TestRename(TestData):
def test_multi_rename(self):
ed_field_mappings = ed.FieldMappings(
client=ed.Client(ES_TEST_CLIENT), index_pattern=FLIGHTS_INDEX_NAME
client=ES_TEST_CLIENT, index_pattern=FLIGHTS_INDEX_NAME
)
pd_flights_column_series = self.pd_flights().columns.to_series()

View File

@ -25,7 +25,7 @@ from eland.tests.common import TestData
class TestScriptedFields(TestData):
def test_add_new_scripted_field(self):
ed_field_mappings = ed.FieldMappings(
client=ed.Client(ES_TEST_CLIENT), index_pattern=FLIGHTS_INDEX_NAME
client=ES_TEST_CLIENT, index_pattern=FLIGHTS_INDEX_NAME
)
ed_field_mappings.add_scripted_field(
@ -44,7 +44,7 @@ class TestScriptedFields(TestData):
def test_add_duplicate_scripted_field(self):
ed_field_mappings = ed.FieldMappings(
client=ed.Client(ES_TEST_CLIENT), index_pattern=FLIGHTS_INDEX_NAME
client=ES_TEST_CLIENT, index_pattern=FLIGHTS_INDEX_NAME
)
ed_field_mappings.add_scripted_field(

View File

@ -14,7 +14,6 @@
import pandas as pd
from elasticsearch import helpers
from elasticsearch.client import ClusterClient
from eland.tests import (
FLIGHTS_FILE_NAME,
@ -85,9 +84,8 @@ def _setup_data(es):
def _update_max_compilations_limit(es, limit="10000/1m"):
print("Updating script.max_compilations_rate to ", limit)
cluster_client = ClusterClient(es)
body = {"transient": {"script.max_compilations_rate": limit}}
cluster_client.put_settings(body=body)
es.cluster.put_settings(body=body)
def _setup_test_mappings(es):

View File

@ -17,9 +17,11 @@ import csv
import pandas as pd
from pandas.io.parsers import _c_parser_defaults
from eland import Client, DEFAULT_CHUNK_SIZE
from eland import DEFAULT_CHUNK_SIZE
from eland import DataFrame
from eland import FieldMappings
from eland.common import ensure_es_client
from elasticsearch.helpers import bulk
def read_es(es_client, es_index_pattern):
@ -32,8 +34,7 @@ def read_es(es_client, es_index_pattern):
----------
es_client: Elasticsearch client argument(s)
- elasticsearch-py parameters or
- elasticsearch-py instance or
- eland.Client instance
- elasticsearch-py instance
es_index_pattern: str
Elasticsearch index pattern
@ -69,8 +70,7 @@ def pandas_to_eland(
----------
es_client: Elasticsearch client argument(s)
- elasticsearch-py parameters or
- elasticsearch-py instance or
- eland.Client instance
- elasticsearch-py instance
es_dest_index: str
Name of Elasticsearch index to be appended to
es_if_exists : {'fail', 'replace', 'append'}, default 'fail'
@ -164,12 +164,11 @@ def pandas_to_eland(
if chunksize is None:
chunksize = DEFAULT_CHUNK_SIZE
client = Client(es_client)
mapping = FieldMappings._generate_es_mappings(pd_df, es_geo_points)
es_client = ensure_es_client(es_client)
# If table exists, check if_exists parameter
if client.index_exists(index=es_dest_index):
if es_client.indices.exists(index=es_dest_index):
if es_if_exists == "fail":
raise ValueError(
f"Could not create the index [{es_dest_index}] because it "
@ -178,12 +177,12 @@ def pandas_to_eland(
f"'append' or 'replace' data."
)
elif es_if_exists == "replace":
client.index_delete(index=es_dest_index)
client.index_create(index=es_dest_index, body=mapping)
es_client.indices.delete(index=es_dest_index)
es_client.indices.create(index=es_dest_index, body=mapping)
# elif if_exists == "append":
# TODO validate mapping are compatible
else:
client.index_create(index=es_dest_index, body=mapping)
es_client.indices.create(index=es_dest_index, body=mapping)
# Now add data
actions = []
@ -208,14 +207,11 @@ def pandas_to_eland(
n = n + 1
if n % chunksize == 0:
client.bulk(actions, refresh=es_refresh)
bulk(client=es_client, actions=actions, refresh=es_refresh)
actions = []
client.bulk(actions, refresh=es_refresh)
ed_df = DataFrame(client, es_dest_index)
return ed_df
bulk(client=es_client, actions=actions, refresh=es_refresh)
return DataFrame(es_client, es_dest_index)
def eland_to_pandas(ed_df, show_progress=False):
@ -357,8 +353,7 @@ def read_csv(
----------
es_client: Elasticsearch client argument(s)
- elasticsearch-py parameters or
- elasticsearch-py instance or
- eland.Client instance
- elasticsearch-py instance
es_dest_index: str
Name of Elasticsearch index to be appended to
es_if_exists : {'fail', 'replace', 'append'}, default 'fail'
@ -490,8 +485,6 @@ def read_csv(
if chunksize is None:
kwds.update(chunksize=DEFAULT_CHUNK_SIZE)
client = Client(es_client)
# read csv in chunks to pandas DataFrame and dump to eland DataFrame (and Elasticsearch)
reader = pd.read_csv(filepath_or_buffer, **kwds)
@ -500,7 +493,7 @@ def read_csv(
if first_write:
pandas_to_eland(
chunk,
client,
es_client,
es_dest_index,
es_if_exists=es_if_exists,
chunksize=chunksize,
@ -512,7 +505,7 @@ def read_csv(
else:
pandas_to_eland(
chunk,
client,
es_client,
es_dest_index,
es_if_exists="append",
chunksize=chunksize,
@ -522,6 +515,4 @@ def read_csv(
)
# Now create an eland.DataFrame that references the new index
ed_df = DataFrame(client, es_dest_index)
return ed_df
return DataFrame(es_client, es_dest_index)