mirror of
https://github.com/elastic/eland.git
synced 2025-07-11 00:02:14 +08:00
Add support for Pandas v1.3 and LightGBM v3.x
This commit is contained in:
parent
22475cdc46
commit
193bcb73ef
@ -26,12 +26,14 @@ from elasticsearch import Elasticsearch
|
||||
|
||||
# Default number of rows displayed (different to pandas where ALL could be displayed)
|
||||
DEFAULT_NUM_ROWS_DISPLAYED = 60
|
||||
|
||||
DEFAULT_CHUNK_SIZE = 10000
|
||||
DEFAULT_CSV_BATCH_OUTPUT_SIZE = 10000
|
||||
DEFAULT_PROGRESS_REPORTING_NUM_ROWS = 10000
|
||||
DEFAULT_ES_MAX_RESULT_WINDOW = 10000 # index.max_result_window
|
||||
DEFAULT_PAGINATION_SIZE = 5000 # for composite aggregations
|
||||
PANDAS_VERSION: Tuple[int, ...] = tuple(
|
||||
int(part) for part in pd.__version__.split(".") if part.isdigit()
|
||||
)[:2]
|
||||
|
||||
|
||||
with warnings.catch_warnings():
|
||||
|
41
eland/etl.py
41
eland/etl.py
@ -22,12 +22,18 @@ from typing import Any, Dict, Generator, List, Mapping, Optional, Tuple, Union
|
||||
import pandas as pd # type: ignore
|
||||
from elasticsearch import Elasticsearch
|
||||
from elasticsearch.helpers import parallel_bulk
|
||||
from pandas.io.parsers import _c_parser_defaults # type: ignore
|
||||
|
||||
from eland import DataFrame
|
||||
from eland.common import DEFAULT_CHUNK_SIZE, ensure_es_client
|
||||
from eland.common import DEFAULT_CHUNK_SIZE, PANDAS_VERSION, ensure_es_client
|
||||
from eland.field_mappings import FieldMappings, verify_mapping_compatibility
|
||||
|
||||
try:
|
||||
from pandas.io.parsers import _c_parser_defaults # type: ignore
|
||||
except ImportError:
|
||||
from pandas.io.parsers.readers import _c_parser_defaults # type: ignore
|
||||
|
||||
_DEFAULT_LOW_MEMORY: bool = _c_parser_defaults["low_memory"]
|
||||
|
||||
|
||||
def pandas_to_eland(
|
||||
pd_df: pd.DataFrame,
|
||||
@ -339,11 +345,12 @@ def csv_to_eland( # type: ignore
|
||||
encoding=None,
|
||||
dialect=None,
|
||||
# Error Handling
|
||||
error_bad_lines=True,
|
||||
warn_bad_lines=True,
|
||||
warn_bad_lines: bool = True,
|
||||
error_bad_lines: bool = True,
|
||||
on_bad_lines: str = "error",
|
||||
# Internal
|
||||
delim_whitespace=False,
|
||||
low_memory=_c_parser_defaults["low_memory"],
|
||||
low_memory: bool = _DEFAULT_LOW_MEMORY,
|
||||
memory_map=False,
|
||||
float_precision=None,
|
||||
) -> "DataFrame":
|
||||
@ -481,6 +488,7 @@ def csv_to_eland( # type: ignore
|
||||
"delim_whitespace": delim_whitespace,
|
||||
"warn_bad_lines": warn_bad_lines,
|
||||
"error_bad_lines": error_bad_lines,
|
||||
"on_bad_lines": on_bad_lines,
|
||||
"low_memory": low_memory,
|
||||
"mangle_dupe_cols": mangle_dupe_cols,
|
||||
"infer_datetime_format": infer_datetime_format,
|
||||
@ -490,6 +498,29 @@ def csv_to_eland( # type: ignore
|
||||
if chunksize is None:
|
||||
kwargs["chunksize"] = DEFAULT_CHUNK_SIZE
|
||||
|
||||
if PANDAS_VERSION >= (1, 3):
|
||||
# Bug in Pandas v1.3.0
|
||||
# If names and prefix both passed as None, it's considering them as specified values and throwing ValueError
|
||||
# Ref: https://github.com/pandas-dev/pandas/issues/42387
|
||||
if kwargs["names"] is None and kwargs["prefix"] is None:
|
||||
kwargs.pop("prefix")
|
||||
|
||||
if kwargs["warn_bad_lines"] is True:
|
||||
kwargs["on_bad_lines"] = "warn"
|
||||
if kwargs["error_bad_lines"] is True:
|
||||
kwargs["on_bad_lines"] = "error"
|
||||
|
||||
kwargs.pop("warn_bad_lines")
|
||||
kwargs.pop("error_bad_lines")
|
||||
|
||||
else:
|
||||
if on_bad_lines == "warn":
|
||||
kwargs["warn_bad_lines"] = True
|
||||
if on_bad_lines == "error":
|
||||
kwargs["error_bad_lines"] = True
|
||||
|
||||
kwargs.pop("on_bad_lines")
|
||||
|
||||
# read csv in chunks to pandas DataFrame and dump to eland DataFrame (and Elasticsearch)
|
||||
reader = pd.read_csv(filepath_or_buffer, **kwargs)
|
||||
|
||||
|
@ -16,9 +16,15 @@
|
||||
# under the License.
|
||||
|
||||
import numpy as np
|
||||
from pandas.core.dtypes.generic import ABCIndexClass
|
||||
from pandas.plotting._matplotlib import converter
|
||||
|
||||
try:
|
||||
# pandas<1.3.0
|
||||
from pandas.core.dtypes.generic import ABCIndexClass as ABCIndex
|
||||
except ImportError:
|
||||
# pandas>=1.3.0
|
||||
from pandas.core.dtypes.generic import ABCIndex
|
||||
|
||||
try: # pandas>=1.2.0
|
||||
from pandas.plotting._matplotlib.tools import (
|
||||
create_subplots,
|
||||
@ -113,7 +119,7 @@ def hist_frame(
|
||||
raise NotImplementedError("TODO")
|
||||
|
||||
if column is not None:
|
||||
if not isinstance(column, (list, np.ndarray, ABCIndexClass)):
|
||||
if not isinstance(column, (list, np.ndarray, ABCIndex)):
|
||||
column = [column]
|
||||
ed_df_bins = ed_df_bins[column]
|
||||
ed_df_weights = ed_df_weights[column]
|
||||
|
@ -1,5 +1,5 @@
|
||||
elasticsearch>=7.7
|
||||
pandas>=1
|
||||
pandas>=1.2.0
|
||||
matplotlib
|
||||
pytest>=5.2.1
|
||||
pytest-mock
|
||||
@ -8,5 +8,5 @@ numpydoc>=0.9.0
|
||||
scikit-learn>=0.22.1
|
||||
xgboost>=1
|
||||
nox
|
||||
lightgbm>=2.3.0
|
||||
lightgbm
|
||||
pytest-cov
|
||||
|
Loading…
x
Reference in New Issue
Block a user