Normalize and prune top-level APIs

This commit is contained in:
Seth Michael Larson 2020-05-18 14:55:41 -05:00 committed by GitHub
parent d1444f8e09
commit 1378544933
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
36 changed files with 778 additions and 725 deletions

File diff suppressed because one or more lines are too long

File diff suppressed because one or more lines are too long

View File

@ -0,0 +1,10 @@
eland.DataFrame.es_info
=======================
.. currentmodule:: eland
.. warning::
Previously this method was named ``info_es()``.
DataFrame.info_es() is deprecated, use DataFrame.es_info() instead.
.. automethod:: DataFrame.es_info

View File

@ -1,6 +0,0 @@
eland.DataFrame.info_es
=======================
.. currentmodule:: eland
.. automethod:: DataFrame.info_es

View File

@ -0,0 +1,6 @@
eland.DataFrame.to_pandas
=========================
.. currentmodule:: eland
.. automethod:: DataFrame.to_pandas

View File

@ -0,0 +1,10 @@
eland.Series.es_info
====================
.. currentmodule:: eland
.. warning::
Previously this method was named ``info_es()``.
Series.info_es() is deprecated, use Series.es_info() instead.
.. automethod:: Series.es_info

View File

@ -1,6 +0,0 @@
eland.Series.info_es
====================
.. currentmodule:: eland
.. automethod:: Series.info_es

View File

@ -0,0 +1,6 @@
eland.DataFrame.to_pandas
=========================
.. currentmodule:: eland
.. automethod:: Series.to_pandas

View File

@ -0,0 +1,6 @@
eland.csv_to_eland
==================
.. currentmodule:: eland
.. autofunction:: csv_to_eland

View File

@ -1,6 +0,0 @@
eland.read_csv
==============
.. currentmodule:: eland
.. autofunction:: read_csv

View File

@ -1,6 +0,0 @@
eland.read_es
=============
.. currentmodule:: eland
.. autofunction:: read_es

View File

@ -82,7 +82,7 @@ Elasticsearch Functions
.. autosummary::
:toctree: api/
DataFrame.info_es
DataFrame.es_info
DataFrame.es_query
Serialization / IO / conversion
@ -95,3 +95,4 @@ Serialization / IO / conversion
DataFrame.to_csv
DataFrame.to_html
DataFrame.to_string
DataFrame.to_pandas

View File

@ -5,13 +5,6 @@ General utility functions
=========================
.. currentmodule:: eland
Elasticsearch access
~~~~~~~~~~~~~~~~~~~~
.. autosummary::
:toctree: api/
read_es
Pandas and Eland
~~~~~~~~~~~~~~~~
.. autosummary::

View File

@ -10,4 +10,4 @@ Flat File
.. autosummary::
:toctree: api/
read_csv
csv_to_eland

View File

@ -89,10 +89,11 @@ Serialization / IO / conversion
Series.to_string
Series.to_numpy
Series.to_pandas
Elasticsearch utilities
Elasticsearch Functions
~~~~~~~~~~~~~~~~~~~~~~~
.. autosummary::
:toctree: api/
Series.info_es
Series.es_info

View File

@ -17,7 +17,7 @@ from eland.index import Index
from eland.ndframe import NDFrame
from eland.series import Series
from eland.dataframe import DataFrame
from eland.utils import pandas_to_eland, eland_to_pandas, read_es, read_csv
from eland.etl import pandas_to_eland, eland_to_pandas, read_es, read_csv, csv_to_eland
__all__ = [
"DataFrame",
@ -26,6 +26,7 @@ __all__ = [
"Index",
"pandas_to_eland",
"eland_to_pandas",
"csv_to_eland",
"read_csv",
"read_es",
"SortOrder",

View File

@ -19,10 +19,11 @@ from pandas.io.formats.printing import pprint_thing
from pandas.util._validators import validate_bool_kwarg
import eland.plotting as gfx
from eland import NDFrame
from eland import Series
from eland.ndframe import NDFrame
from eland.series import Series
from eland.common import DEFAULT_NUM_ROWS_DISPLAYED, docstring_parameter
from eland.filter import BooleanFilter
from eland.utils import deprecated_api
class DataFrame(NDFrame):
@ -34,14 +35,14 @@ class DataFrame(NDFrame):
Parameters
----------
client: Elasticsearch client argument(s) (e.g. 'localhost:9200')
es_client: Elasticsearch client argument(s) (e.g. 'localhost:9200')
- elasticsearch-py parameters or
- elasticsearch-py instance
index_pattern: str
es_index_pattern: str
Elasticsearch index pattern. This can contain wildcards. (e.g. 'flights')
columns: list of str, optional
List of DataFrame columns. A subset of the Elasticsearch index's fields.
index_field: str, optional
es_index_field: str, optional
The Elasticsearch index field to use as the DataFrame index. Defaults to _id if None is used.
See Also
@ -68,7 +69,7 @@ class DataFrame(NDFrame):
>>> from elasticsearch import Elasticsearch
>>> es = Elasticsearch("localhost:9200")
>>> df = ed.DataFrame(client=es, index_pattern='flights', columns=['AvgTicketPrice', 'Cancelled'])
>>> df = ed.DataFrame(es_client=es, es_index_pattern='flights', columns=['AvgTicketPrice', 'Cancelled'])
>>> df.head()
AvgTicketPrice Cancelled
0 841.265642 False
@ -83,8 +84,12 @@ class DataFrame(NDFrame):
index field
(TODO - currently index_field must also be a field if not _id)
>>> df = ed.DataFrame(client='localhost', index_pattern='flights', columns=['AvgTicketPrice', 'timestamp'],
... index_field='timestamp')
>>> df = ed.DataFrame(
... es_client='localhost',
... es_index_pattern='flights',
... columns=['AvgTicketPrice', 'timestamp'],
... es_index_field='timestamp'
... )
>>> df.head()
AvgTicketPrice timestamp
2018-01-01T00:00:00 841.265642 2018-01-01 00:00:00
@ -98,12 +103,12 @@ class DataFrame(NDFrame):
def __init__(
self,
client=None,
index_pattern=None,
es_client=None,
es_index_pattern=None,
es_index_field=None,
columns=None,
index_field=None,
query_compiler=None,
):
_query_compiler=None,
) -> None:
"""
There are effectively 2 constructors:
@ -112,18 +117,18 @@ class DataFrame(NDFrame):
The constructor with 'query_compiler' is for internal use only.
"""
if query_compiler is None:
if client is None or index_pattern is None:
if _query_compiler is None:
if es_client is None or es_index_pattern is None:
raise ValueError(
"client and index_pattern must be defined in DataFrame constructor"
)
# python 3 syntax
super().__init__(
client=client,
index_pattern=index_pattern,
es_client=es_client,
es_index_pattern=es_index_pattern,
columns=columns,
index_field=index_field,
query_compiler=query_compiler,
es_index_field=es_index_field,
_query_compiler=_query_compiler,
)
def _get_columns(self):
@ -210,7 +215,7 @@ class DataFrame(NDFrame):
<BLANKLINE>
[3 rows x 2 columns]
"""
return DataFrame(query_compiler=self._query_compiler.head(n))
return DataFrame(_query_compiler=self._query_compiler.head(n))
def tail(self, n: int = 5) -> "DataFrame":
"""
@ -254,7 +259,7 @@ class DataFrame(NDFrame):
<BLANKLINE>
[5 rows x 2 columns]
"""
return DataFrame(query_compiler=self._query_compiler.tail(n))
return DataFrame(_query_compiler=self._query_compiler.tail(n))
def sample(
self, n: int = None, frac: float = None, random_state: int = None
@ -290,7 +295,7 @@ class DataFrame(NDFrame):
raise ValueError("Please enter a value for `frac` OR `n`, not both")
return DataFrame(
query_compiler=self._query_compiler.sample(
_query_compiler=self._query_compiler.sample(
n=n, frac=frac, random_state=random_state
)
)
@ -543,7 +548,7 @@ class DataFrame(NDFrame):
"""
return self._query_compiler.count()
def info_es(self):
def es_info(self):
# noinspection PyPep8
"""
A debug summary of an eland DataFrame internals.
@ -570,10 +575,10 @@ class DataFrame(NDFrame):
12907 2018-02-11 20:08:25 AMS LIM 225
<BLANKLINE>
[5 rows x 4 columns]
>>> print(df.info_es())
index_pattern: flights
>>> print(df.es_info())
es_index_pattern: flights
Index:
index_field: _id
es_index_field: _id
is_source_field: False
Mappings:
capabilities:
@ -593,10 +598,14 @@ class DataFrame(NDFrame):
"""
buf = StringIO()
super()._info_es(buf)
super()._es_info(buf)
return buf.getvalue()
@deprecated_api("eland.DataFrame.es_info()")
def info_es(self):
return self.es_info()
def es_query(self, query):
"""Applies an Elasticsearch DSL query to the current DataFrame.
@ -651,7 +660,7 @@ class DataFrame(NDFrame):
raise TypeError("'query' must be of type 'dict'")
if tuple(query) == ("query",):
query = query["query"]
return DataFrame(query_compiler=self._query_compiler.es_query(query))
return DataFrame(_query_compiler=self._query_compiler.es_query(query))
def _index_summary(self):
# Print index summary e.g.
@ -659,11 +668,11 @@ class DataFrame(NDFrame):
# Do this by getting head and tail of dataframe
if self.empty:
# index[0] is out of bounds for empty df
head = self.head(1)._to_pandas()
tail = self.tail(1)._to_pandas()
head = self.head(1).to_pandas()
tail = self.tail(1).to_pandas()
else:
head = self.head(1)._to_pandas().index[0]
tail = self.tail(1)._to_pandas().index[0]
head = self.head(1).to_pandas().index[0]
tail = self.tail(1).to_pandas().index[0]
index_summary = f", {pprint_thing(head)} to {pprint_thing(tail)}"
name = "Index"
@ -1076,7 +1085,7 @@ class DataFrame(NDFrame):
elif isinstance(key, DataFrame):
return self.where(key)
elif isinstance(key, BooleanFilter):
return DataFrame(query_compiler=self._query_compiler._update_query(key))
return DataFrame(_query_compiler=self._query_compiler._update_query(key))
else:
return self._getitem_column(key)
@ -1088,7 +1097,7 @@ class DataFrame(NDFrame):
def _getitem_array(self, key):
if isinstance(key, Series):
key = key._to_pandas()
key = key.to_pandas()
if is_bool_indexer(key):
if isinstance(key, pd.Series) and not key.index.equals(self.index):
warnings.warn(
@ -1107,7 +1116,7 @@ class DataFrame(NDFrame):
key = pd.RangeIndex(len(self.index))[key]
if len(key):
return DataFrame(
query_compiler=self._query_compiler.getitem_row_array(key)
_query_compiler=self._query_compiler.getitem_row_array(key)
)
else:
return DataFrame(columns=self.columns)
@ -1118,7 +1127,7 @@ class DataFrame(NDFrame):
f" not index"
)
return DataFrame(
query_compiler=self._query_compiler.getitem_column_array(key)
_query_compiler=self._query_compiler.getitem_column_array(key)
)
def _create_or_update_from_compiler(self, new_query_compiler, inplace=False):
@ -1128,13 +1137,13 @@ class DataFrame(NDFrame):
or type(new_query_compiler) in self._query_compiler.__class__.__bases__
), f"Invalid Query Compiler object: {type(new_query_compiler)}"
if not inplace:
return DataFrame(query_compiler=new_query_compiler)
return DataFrame(_query_compiler=new_query_compiler)
else:
self._query_compiler = new_query_compiler
@staticmethod
def _reduce_dimension(query_compiler):
return Series(query_compiler=query_compiler)
return Series(_query_compiler=query_compiler)
def to_csv(
self,
@ -1189,7 +1198,7 @@ class DataFrame(NDFrame):
}
return self._query_compiler.to_csv(**kwargs)
def _to_pandas(self, show_progress=False):
def to_pandas(self, show_progress: bool = False) -> "DataFrame":
"""
Utility method to convert eland.Dataframe to pandas.Dataframe
@ -1256,7 +1265,7 @@ class DataFrame(NDFrame):
Examples
--------
>>> df = ed.read_es('localhost', 'ecommerce')
>>> df = ed.DataFrame('localhost', 'ecommerce')
>>> df.shape
(4675, 45)
"""
@ -1372,7 +1381,7 @@ class DataFrame(NDFrame):
Examples
--------
>>> df = ed.read_es('localhost', 'flights')
>>> df = ed.DataFrame('localhost', 'flights')
>>> df.shape
(13059, 27)
>>> df.query('FlightDelayMin > 60').shape
@ -1380,7 +1389,7 @@ class DataFrame(NDFrame):
"""
if isinstance(expr, BooleanFilter):
return DataFrame(
query_compiler=self._query_compiler._update_query(BooleanFilter(expr))
_query_compiler=self._query_compiler._update_query(BooleanFilter(expr))
)
elif isinstance(expr, str):
column_resolver = {}
@ -1390,7 +1399,7 @@ class DataFrame(NDFrame):
resolvers = column_resolver, {}
# Use pandas eval to parse query - TODO validate this further
filter = eval(expr, target=self, resolvers=tuple(tuple(resolvers)))
return DataFrame(query_compiler=self._query_compiler._update_query(filter))
return DataFrame(_query_compiler=self._query_compiler._update_query(filter))
else:
raise NotImplementedError(expr, type(expr))

521
eland/etl.py Normal file
View File

@ -0,0 +1,521 @@
# Licensed to Elasticsearch B.V under one or more agreements.
# Elasticsearch B.V licenses this file to you under the Apache 2.0 License.
# See the LICENSE file in the project root for more information
import csv
from typing import Union, List, Tuple, Optional, Mapping, Dict, Any
import pandas as pd # type: ignore
from pandas.io.parsers import _c_parser_defaults # type: ignore
from eland import DataFrame
from eland.field_mappings import FieldMappings
from eland.common import ensure_es_client, DEFAULT_CHUNK_SIZE
from eland.utils import deprecated_api
from elasticsearch import Elasticsearch # type: ignore
from elasticsearch.helpers import bulk # type: ignore
@deprecated_api("eland.DataFrame()")
def read_es(
es_client: Union[str, List[str], Tuple[str, ...], Elasticsearch],
es_index_pattern: str,
) -> DataFrame:
"""
Utility method to create an eland.Dataframe from an Elasticsearch index_pattern.
(Similar to pandas.read_csv, but source data is an Elasticsearch index rather than
a csv file)
Parameters
----------
es_client: Elasticsearch client argument(s)
- elasticsearch-py parameters or
- elasticsearch-py instance
es_index_pattern: str
Elasticsearch index pattern
Returns
-------
eland.DataFrame
See Also
--------
eland.pandas_to_eland: Create an eland.Dataframe from pandas.DataFrame
eland.eland_to_pandas: Create a pandas.Dataframe from eland.DataFrame
"""
return DataFrame(es_client=es_client, es_index_pattern=es_index_pattern)
def pandas_to_eland(
pd_df: pd.DataFrame,
es_client: Union[str, List[str], Tuple[str, ...], Elasticsearch],
es_dest_index: str,
es_if_exists: str = "fail",
es_refresh: bool = False,
es_dropna: bool = False,
es_type_overrides: Optional[Mapping[str, str]] = None,
chunksize: Optional[int] = None,
use_pandas_index_for_es_ids: bool = True,
) -> DataFrame:
"""
Append a pandas DataFrame to an Elasticsearch index.
Mainly used in testing.
Modifies the elasticsearch destination index
Parameters
----------
es_client: Elasticsearch client argument(s)
- elasticsearch-py parameters or
- elasticsearch-py instance
es_dest_index: str
Name of Elasticsearch index to be appended to
es_if_exists : {'fail', 'replace', 'append'}, default 'fail'
How to behave if the index already exists.
- fail: Raise a ValueError.
- replace: Delete the index before inserting new values.
- append: Insert new values to the existing index. Create if does not exist.
es_refresh: bool, default 'False'
Refresh es_dest_index after bulk index
es_dropna: bool, default 'False'
* True: Remove missing values (see pandas.Series.dropna)
* False: Include missing values - may cause bulk to fail
es_type_overrides: dict, default None
Dict of field_name: es_data_type that overrides default es data types
chunksize: int, default None
Number of pandas.DataFrame rows to read before bulk index into Elasticsearch
use_pandas_index_for_es_ids: bool, default 'True'
* True: pandas.DataFrame.index fields will be used to populate Elasticsearch '_id' fields.
* False: Ignore pandas.DataFrame.index when indexing into Elasticsearch
Returns
-------
eland.Dataframe
eland.DataFrame referencing data in destination_index
Examples
--------
>>> pd_df = pd.DataFrame(data={'A': 3.141,
... 'B': 1,
... 'C': 'foo',
... 'D': pd.Timestamp('20190102'),
... 'E': [1.0, 2.0, 3.0],
... 'F': False,
... 'G': [1, 2, 3],
... 'H': 'Long text - to be indexed as es type text'},
... index=['0', '1', '2'])
>>> type(pd_df)
<class 'pandas.core.frame.DataFrame'>
>>> pd_df
A B ... G H
0 3.141 1 ... 1 Long text - to be indexed as es type text
1 3.141 1 ... 2 Long text - to be indexed as es type text
2 3.141 1 ... 3 Long text - to be indexed as es type text
<BLANKLINE>
[3 rows x 8 columns]
>>> pd_df.dtypes
A float64
B int64
C object
D datetime64[ns]
E float64
F bool
G int64
H object
dtype: object
Convert `pandas.DataFrame` to `eland.DataFrame` - this creates an Elasticsearch index called `pandas_to_eland`.
Overwrite existing Elasticsearch index if it exists `if_exists="replace"`, and sync index so it is
readable on return `refresh=True`
>>> ed_df = ed.pandas_to_eland(pd_df,
... 'localhost',
... 'pandas_to_eland',
... es_if_exists="replace",
... es_refresh=True,
... es_type_overrides={'H':'text'}) # index field 'H' as text not keyword
>>> type(ed_df)
<class 'eland.dataframe.DataFrame'>
>>> ed_df
A B ... G H
0 3.141 1 ... 1 Long text - to be indexed as es type text
1 3.141 1 ... 2 Long text - to be indexed as es type text
2 3.141 1 ... 3 Long text - to be indexed as es type text
<BLANKLINE>
[3 rows x 8 columns]
>>> ed_df.dtypes
A float64
B int64
C object
D datetime64[ns]
E float64
F bool
G int64
H object
dtype: object
See Also
--------
eland.eland_to_pandas: Create a pandas.Dataframe from eland.DataFrame
"""
if chunksize is None:
chunksize = DEFAULT_CHUNK_SIZE
mapping = FieldMappings._generate_es_mappings(pd_df, es_type_overrides)
es_client = ensure_es_client(es_client)
# If table exists, check if_exists parameter
if es_client.indices.exists(index=es_dest_index):
if es_if_exists == "fail":
raise ValueError(
f"Could not create the index [{es_dest_index}] because it "
f"already exists. "
f"Change the if_exists parameter to "
f"'append' or 'replace' data."
)
elif es_if_exists == "replace":
es_client.indices.delete(index=es_dest_index)
es_client.indices.create(index=es_dest_index, body=mapping)
# elif if_exists == "append":
# TODO validate mapping are compatible
else:
es_client.indices.create(index=es_dest_index, body=mapping)
# Now add data
actions = []
n = 0
for row in pd_df.iterrows():
if es_dropna:
values = row[1].dropna().to_dict()
else:
values = row[1].to_dict()
if use_pandas_index_for_es_ids:
# Use index as _id
id = row[0]
# Use integer as id field for repeatable results
action = {"_index": es_dest_index, "_source": values, "_id": str(id)}
else:
action = {"_index": es_dest_index, "_source": values}
actions.append(action)
n = n + 1
if n % chunksize == 0:
bulk(client=es_client, actions=actions, refresh=es_refresh)
actions = []
bulk(client=es_client, actions=actions, refresh=es_refresh)
return DataFrame(es_client, es_dest_index)
def eland_to_pandas(ed_df: DataFrame, show_progress: bool = False) -> pd.DataFrame:
"""
Convert an eland.Dataframe to a pandas.DataFrame
**Note: this loads the entire Elasticsearch index into in core pandas.DataFrame structures. For large
indices this can create significant load on the Elasticsearch cluster and require signficant memory**
Parameters
----------
ed_df: eland.DataFrame
The source eland.Dataframe referencing the Elasticsearch index
show_progress: bool
Output progress of option to stdout? By default False.
Returns
-------
pandas.Dataframe
pandas.DataFrame contains all rows and columns in eland.DataFrame
Examples
--------
>>> ed_df = ed.DataFrame('localhost', 'flights').head()
>>> type(ed_df)
<class 'eland.dataframe.DataFrame'>
>>> ed_df
AvgTicketPrice Cancelled ... dayOfWeek timestamp
0 841.265642 False ... 0 2018-01-01 00:00:00
1 882.982662 False ... 0 2018-01-01 18:27:00
2 190.636904 False ... 0 2018-01-01 17:11:14
3 181.694216 True ... 0 2018-01-01 10:33:28
4 730.041778 False ... 0 2018-01-01 05:13:00
<BLANKLINE>
[5 rows x 27 columns]
Convert `eland.DataFrame` to `pandas.DataFrame` (Note: this loads entire Elasticsearch index into core memory)
>>> pd_df = ed.eland_to_pandas(ed_df)
>>> type(pd_df)
<class 'pandas.core.frame.DataFrame'>
>>> pd_df
AvgTicketPrice Cancelled ... dayOfWeek timestamp
0 841.265642 False ... 0 2018-01-01 00:00:00
1 882.982662 False ... 0 2018-01-01 18:27:00
2 190.636904 False ... 0 2018-01-01 17:11:14
3 181.694216 True ... 0 2018-01-01 10:33:28
4 730.041778 False ... 0 2018-01-01 05:13:00
<BLANKLINE>
[5 rows x 27 columns]
Convert `eland.DataFrame` to `pandas.DataFrame` and show progress every 10000 rows
>>> pd_df = ed.eland_to_pandas(ed.DataFrame('localhost', 'flights'), show_progress=True) # doctest: +SKIP
2020-01-29 12:43:36.572395: read 10000 rows
2020-01-29 12:43:37.309031: read 13059 rows
See Also
--------
eland.pandas_to_eland: Create an eland.Dataframe from pandas.DataFrame
"""
return ed_df.to_pandas(show_progress=show_progress)
def csv_to_eland( # type: ignore
filepath_or_buffer,
es_client: Union[str, List[str], Tuple[str, ...], Elasticsearch],
es_dest_index: str,
es_if_exists: str = "fail",
es_refresh: bool = False,
es_dropna: bool = False,
es_type_overrides: Optional[Mapping[str, str]] = None,
sep=",",
delimiter=None,
# Column and Index Locations and Names
header="infer",
names=None,
index_col=None,
usecols=None,
squeeze=False,
prefix=None,
mangle_dupe_cols=True,
# General Parsing Configuration
dtype=None,
engine=None,
converters=None,
true_values=None,
false_values=None,
skipinitialspace=False,
skiprows=None,
skipfooter=0,
nrows=None,
# Iteration
# iterator=False,
chunksize=None,
# NA and Missing Data Handling
na_values=None,
keep_default_na=True,
na_filter=True,
verbose=False,
skip_blank_lines=True,
# Datetime Handling
parse_dates=False,
infer_datetime_format=False,
keep_date_col=False,
date_parser=None,
dayfirst=False,
cache_dates=True,
# Quoting, Compression, and File Format
compression="infer",
thousands=None,
decimal=b".",
lineterminator=None,
quotechar='"',
quoting=csv.QUOTE_MINIMAL,
doublequote=True,
escapechar=None,
comment=None,
encoding=None,
dialect=None,
# Error Handling
error_bad_lines=True,
warn_bad_lines=True,
# Internal
delim_whitespace=False,
low_memory=_c_parser_defaults["low_memory"],
memory_map=False,
float_precision=None,
) -> "DataFrame":
"""
Read a comma-separated values (csv) file into eland.DataFrame (i.e. an Elasticsearch index).
**Modifies an Elasticsearch index**
**Note pandas iteration options not supported**
Parameters
----------
es_client: Elasticsearch client argument(s)
- elasticsearch-py parameters or
- elasticsearch-py instance
es_dest_index: str
Name of Elasticsearch index to be appended to
es_if_exists : {'fail', 'replace', 'append'}, default 'fail'
How to behave if the index already exists.
- fail: Raise a ValueError.
- replace: Delete the index before inserting new values.
- append: Insert new values to the existing index. Create if does not exist.
es_dropna: bool, default 'False'
* True: Remove missing values (see pandas.Series.dropna)
* False: Include missing values - may cause bulk to fail
es_type_overrides: dict, default None
Dict of columns: es_type to override default es datatype mappings
chunksize
number of csv rows to read before bulk index into Elasticsearch
Other Parameters
----------------
Parameters derived from :pandas_api_docs:`pandas.read_csv`.
See Also
--------
:pandas_api_docs:`pandas.read_csv`
Notes
-----
iterator not supported
Examples
--------
See if 'churn' index exists in Elasticsearch
>>> from elasticsearch import Elasticsearch # doctest: +SKIP
>>> es = Elasticsearch() # doctest: +SKIP
>>> es.indices.exists(index="churn") # doctest: +SKIP
False
Read 'churn.csv' and use first column as _id (and eland.DataFrame index)
::
# churn.csv
,state,account length,area code,phone number,international plan,voice mail plan,number vmail messages,total day minutes,total day calls,total day charge,total eve minutes,total eve calls,total eve charge,total night minutes,total night calls,total night charge,total intl minutes,total intl calls,total intl charge,customer service calls,churn
0,KS,128,415,382-4657,no,yes,25,265.1,110,45.07,197.4,99,16.78,244.7,91,11.01,10.0,3,2.7,1,0
1,OH,107,415,371-7191,no,yes,26,161.6,123,27.47,195.5,103,16.62,254.4,103,11.45,13.7,3,3.7,1,0
...
>>> ed.csv_to_eland(
... "churn.csv",
... es_client='localhost',
... es_dest_index='churn',
... es_refresh=True,
... index_col=0
... ) # doctest: +SKIP
account length area code churn customer service calls ... total night calls total night charge total night minutes voice mail plan
0 128 415 0 1 ... 91 11.01 244.7 yes
1 107 415 0 1 ... 103 11.45 254.4 yes
2 137 415 0 0 ... 104 7.32 162.6 no
3 84 408 0 2 ... 89 8.86 196.9 no
4 75 415 0 3 ... 121 8.41 186.9 no
... ... ... ... ... ... ... ... ... ...
3328 192 415 0 2 ... 83 12.56 279.1 yes
3329 68 415 0 3 ... 123 8.61 191.3 no
3330 28 510 0 2 ... 91 8.64 191.9 no
3331 184 510 0 2 ... 137 6.26 139.2 no
3332 74 415 0 0 ... 77 10.86 241.4 yes
<BLANKLINE>
[3333 rows x 21 columns]
Validate data now exists in 'churn' index:
>>> es.search(index="churn", size=1) # doctest: +SKIP
{'took': 1, 'timed_out': False, '_shards': {'total': 1, 'successful': 1, 'skipped': 0, 'failed': 0}, 'hits': {'total': {'value': 3333, 'relation': 'eq'}, 'max_score': 1.0, 'hits': [{'_index': 'churn', '_id': '0', '_score': 1.0, '_source': {'state': 'KS', 'account length': 128, 'area code': 415, 'phone number': '382-4657', 'international plan': 'no', 'voice mail plan': 'yes', 'number vmail messages': 25, 'total day minutes': 265.1, 'total day calls': 110, 'total day charge': 45.07, 'total eve minutes': 197.4, 'total eve calls': 99, 'total eve charge': 16.78, 'total night minutes': 244.7, 'total night calls': 91, 'total night charge': 11.01, 'total intl minutes': 10.0, 'total intl calls': 3, 'total intl charge': 2.7, 'customer service calls': 1, 'churn': 0}}]}}
TODO - currently the eland.DataFrame may not retain the order of the data in the csv.
"""
kwargs: Dict[str, Any] = {
"sep": sep,
"delimiter": delimiter,
"engine": engine,
"dialect": dialect,
"compression": compression,
# "engine_specified": engine_specified,
"doublequote": doublequote,
"escapechar": escapechar,
"quotechar": quotechar,
"quoting": quoting,
"skipinitialspace": skipinitialspace,
"lineterminator": lineterminator,
"header": header,
"index_col": index_col,
"names": names,
"prefix": prefix,
"skiprows": skiprows,
"skipfooter": skipfooter,
"na_values": na_values,
"true_values": true_values,
"false_values": false_values,
"keep_default_na": keep_default_na,
"thousands": thousands,
"comment": comment,
"decimal": decimal,
"parse_dates": parse_dates,
"keep_date_col": keep_date_col,
"dayfirst": dayfirst,
"date_parser": date_parser,
"cache_dates": cache_dates,
"nrows": nrows,
# "iterator": iterator,
"chunksize": chunksize,
"converters": converters,
"dtype": dtype,
"usecols": usecols,
"verbose": verbose,
"encoding": encoding,
"squeeze": squeeze,
"memory_map": memory_map,
"float_precision": float_precision,
"na_filter": na_filter,
"delim_whitespace": delim_whitespace,
"warn_bad_lines": warn_bad_lines,
"error_bad_lines": error_bad_lines,
"low_memory": low_memory,
"mangle_dupe_cols": mangle_dupe_cols,
"infer_datetime_format": infer_datetime_format,
"skip_blank_lines": skip_blank_lines,
}
if chunksize is None:
kwargs["chunksize"] = DEFAULT_CHUNK_SIZE
# read csv in chunks to pandas DataFrame and dump to eland DataFrame (and Elasticsearch)
reader = pd.read_csv(filepath_or_buffer, **kwargs)
first_write = True
for chunk in reader:
if first_write:
pandas_to_eland(
chunk,
es_client,
es_dest_index,
es_if_exists=es_if_exists,
chunksize=chunksize,
es_refresh=es_refresh,
es_dropna=es_dropna,
es_type_overrides=es_type_overrides,
)
first_write = False
else:
pandas_to_eland(
chunk,
es_client,
es_dest_index,
es_if_exists="append",
chunksize=chunksize,
es_refresh=es_refresh,
es_dropna=es_dropna,
es_type_overrides=es_type_overrides,
)
# Now create an eland.DataFrame that references the new index
return DataFrame(es_client, es_index_pattern=es_dest_index)
@deprecated_api("eland.csv_to_eland()")
def read_csv(*args, **kwargs) -> DataFrame: # type: ignore
return csv_to_eland(*args, **kwargs)

View File

@ -14,7 +14,10 @@ from pandas.core.dtypes.common import (
is_string_dtype,
)
from pandas.core.dtypes.inference import is_list_like
from typing import NamedTuple, Optional
from typing import NamedTuple, Optional, Mapping, Dict, TYPE_CHECKING
if TYPE_CHECKING:
from eland import DataFrame
class Field(NamedTuple):
@ -431,7 +434,9 @@ class FieldMappings:
return es_dtype
@staticmethod
def _generate_es_mappings(dataframe, es_type_overrides=None):
def _generate_es_mappings(
dataframe: "DataFrame", es_type_overrides: Optional[Mapping[str, str]] = None
) -> Dict[str, str]:
"""Given a pandas dataframe, generate the associated Elasticsearch mapping
Parameters
@ -712,7 +717,7 @@ class FieldMappings:
# Convert return from 'str' to 'np.dtype'
return pd_dtypes.apply(lambda x: np.dtype(x))
def info_es(self, buf):
def es_info(self, buf):
buf.write("Mappings:\n")
buf.write(f" capabilities:\n{self._mappings_capabilities.to_string()}\n")

View File

@ -4,6 +4,7 @@
from typing import Optional, TextIO, TYPE_CHECKING
from eland.utils import deprecated_api
if TYPE_CHECKING:
from .query_compiler import QueryCompiler
@ -30,7 +31,7 @@ class Index:
ID_SORT_FIELD = "_doc" # if index field is _id, sort by _doc
def __init__(
self, query_compiler: "QueryCompiler", index_field: Optional[str] = None
self, query_compiler: "QueryCompiler", es_index_field: Optional[str] = None
):
self._query_compiler = query_compiler
@ -41,7 +42,7 @@ class Index:
# The type:ignore is due to mypy not being smart enough
# to recognize the property.setter has a different type
# than the property.getter.
self.index_field = index_field # type: ignore
self.es_index_field = es_index_field # type: ignore
@property
def sort_field(self) -> str:
@ -54,11 +55,11 @@ class Index:
return self._is_source_field
@property
def index_field(self) -> str:
def es_index_field(self) -> str:
return self._index_field
@index_field.setter
def index_field(self, index_field: Optional[str]) -> None:
@es_index_field.setter
def es_index_field(self, index_field: Optional[str]) -> None:
if index_field is None or index_field == Index.ID_INDEX_FIELD:
self._index_field = Index.ID_INDEX_FIELD
self._is_source_field = False
@ -77,7 +78,11 @@ class Index:
def __iter__(self) -> "Index":
return self
def info_es(self, buf: TextIO) -> None:
def es_info(self, buf: TextIO) -> None:
buf.write("Index:\n")
buf.write(f" index_field: {self.index_field}\n")
buf.write(f" es_index_field: {self.es_index_field}\n")
buf.write(f" is_source_field: {self.is_source_field}\n")
@deprecated_api("eland.Index.es_info()")
def info_es(self, buf: TextIO) -> None:
self.es_info(buf)

View File

@ -36,11 +36,11 @@ only Elasticsearch aggregatable fields can be aggregated or grouped.
class NDFrame(ABC):
def __init__(
self,
client=None,
index_pattern=None,
es_client=None,
es_index_pattern=None,
columns=None,
index_field=None,
query_compiler=None,
es_index_field=None,
_query_compiler=None,
):
"""
pandas.DataFrame/Series like API that proxies into Elasticsearch index(es).
@ -50,14 +50,14 @@ class NDFrame(ABC):
client : elasticsearch.Elasticsearch
A reference to a Elasticsearch python client
"""
if query_compiler is None:
query_compiler = QueryCompiler(
client=client,
index_pattern=index_pattern,
if _query_compiler is None:
_query_compiler = QueryCompiler(
client=es_client,
index_pattern=es_index_pattern,
display_names=columns,
index_field=index_field,
index_field=es_index_field,
)
self._query_compiler = query_compiler
self._query_compiler = _query_compiler
def _get_index(self):
"""
@ -77,11 +77,11 @@ class NDFrame(ABC):
--------
>>> df = ed.DataFrame('localhost', 'flights')
>>> assert isinstance(df.index, ed.Index)
>>> df.index.index_field
>>> df.index.es_index_field
'_id'
>>> s = df['Carrier']
>>> assert isinstance(s.index, ed.Index)
>>> s.index.index_field
>>> s.index.es_index_field
'_id'
"""
return self._query_compiler.index
@ -118,15 +118,15 @@ class NDFrame(ABC):
def _build_repr(self, num_rows):
# self could be Series or DataFrame
if len(self.index) <= num_rows:
return self._to_pandas()
return self.to_pandas()
num_rows = num_rows
head_rows = int(num_rows / 2) + num_rows % 2
tail_rows = num_rows - head_rows
head = self.head(head_rows)._to_pandas()
tail = self.tail(tail_rows)._to_pandas()
head = self.head(head_rows).to_pandas()
tail = self.tail(tail_rows).to_pandas()
return head.append(tail)
@ -142,8 +142,8 @@ class NDFrame(ABC):
"""
return len(self.index)
def _info_es(self, buf):
self._query_compiler.info_es(buf)
def _es_info(self, buf):
self._query_compiler.es_info(buf)
def mean(self, numeric_only=True):
"""
@ -478,7 +478,7 @@ class NDFrame(ABC):
return self._query_compiler.describe()
@abstractmethod
def _to_pandas(self, show_progress=False):
def to_pandas(self, show_progress=False):
pass
@abstractmethod

View File

@ -11,7 +11,7 @@ import numpy as np
import pandas as pd
from elasticsearch.helpers import scan
from eland import Index
from eland.index import Index
from eland.common import (
SortOrder,
DEFAULT_CSV_BATCH_OUTPUT_SIZE,
@ -860,7 +860,7 @@ class Operations:
# This can return None
return size
def info_es(self, query_compiler, buf):
def es_info(self, query_compiler, buf):
buf.write("Operations:\n")
buf.write(f" tasks: {self._tasks}\n")

View File

@ -12,7 +12,7 @@ import pandas as pd
from eland.field_mappings import FieldMappings
from eland.filter import QueryFilter
from eland.operations import Operations
from eland import Index
from eland.index import Index
from eland.common import (
ensure_es_client,
DEFAULT_PROGRESS_REPORTING_NUM_ROWS,
@ -64,7 +64,7 @@ class QueryCompiler:
if to_copy is not None:
self._client = to_copy._client
self._index_pattern = to_copy._index_pattern
self._index = Index(self, to_copy._index.index_field)
self._index = Index(self, to_copy._index.es_index_field)
self._operations = copy.deepcopy(to_copy._operations)
self._mappings: FieldMappings = copy.deepcopy(to_copy._mappings)
else:
@ -240,9 +240,9 @@ class QueryCompiler:
# get index value - can be _id or can be field value in source
if self._index.is_source_field:
index_field = row[self._index.index_field]
index_field = row[self._index.es_index_field]
else:
index_field = hit[self._index.index_field]
index_field = hit[self._index.es_index_field]
index.append(index_field)
# flatten row to map correctly to 2D DataFrame
@ -349,7 +349,7 @@ class QueryCompiler:
index_count: int
Count of docs where index_field exists
"""
return self._operations.index_count(self, self.index.index_field)
return self._operations.index_count(self, self.index.es_index_field)
def _index_matches_count(self, items):
"""
@ -358,7 +358,9 @@ class QueryCompiler:
index_count: int
Count of docs where items exist
"""
return self._operations.index_matches_count(self, self.index.index_field, items)
return self._operations.index_matches_count(
self, self.index.es_index_field, items
)
def _empty_pd_ef(self):
# Return an empty dataframe with correct columns and dtypes
@ -463,7 +465,7 @@ class QueryCompiler:
result._mappings.display_names = new_columns.to_list()
if index is not None:
result._operations.drop_index_values(self, self.index.index_field, index)
result._operations.drop_index_values(self, self.index.es_index_field, index)
return result
@ -503,12 +505,12 @@ class QueryCompiler:
def value_counts(self, es_size):
return self._operations.value_counts(self, es_size)
def info_es(self, buf):
buf.write(f"index_pattern: {self._index_pattern}\n")
def es_info(self, buf):
buf.write(f"es_index_pattern: {self._index_pattern}\n")
self._index.info_es(buf)
self._mappings.info_es(buf)
self._operations.info_es(self, buf)
self._index.es_info(buf)
self._mappings.es_info(buf)
self._operations.es_info(self, buf)
def describe(self):
return self._operations.describe(self)
@ -550,10 +552,10 @@ class QueryCompiler:
f"{self._client} != {right._client}"
)
if self._index.index_field != right._index.index_field:
if self._index.es_index_field != right._index.es_index_field:
raise ValueError(
f"Can not perform arithmetic operations across different index fields "
f"{self._index.index_field} != {right._index.index_field}"
f"{self._index.es_index_field} != {right._index.es_index_field}"
)
if self._index_pattern != right._index_pattern:

View File

@ -40,6 +40,7 @@ from eland.filter import (
ScriptFilter,
IsIn,
)
from eland.utils import deprecated_api
def _get_method_name():
@ -52,13 +53,13 @@ class Series(NDFrame):
Parameters
----------
client : elasticsearch.Elasticsearch
es_client : elasticsearch.Elasticsearch
A reference to a Elasticsearch python client
index_pattern : str
es_index_pattern : str
An Elasticsearch index pattern. This can contain wildcards.
index_field : str
es_index_field : str
The field to base the series on
Notes
@ -72,7 +73,7 @@ class Series(NDFrame):
Examples
--------
>>> ed.Series(client='localhost', index_pattern='flights', name='Carrier')
>>> ed.Series(es_client='localhost', es_index_pattern='flights', name='Carrier')
0 Kibana Airlines
1 Logstash Airways
2 Logstash Airways
@ -89,11 +90,11 @@ class Series(NDFrame):
def __init__(
self,
client=None,
index_pattern=None,
es_client=None,
es_index_pattern=None,
name=None,
index_field=None,
query_compiler=None,
es_index_field=None,
_query_compiler=None,
):
# Series has 1 column
if name is None:
@ -102,11 +103,11 @@ class Series(NDFrame):
columns = [name]
super().__init__(
client=client,
index_pattern=index_pattern,
es_client=es_client,
es_index_pattern=es_index_pattern,
columns=columns,
index_field=index_field,
query_compiler=query_compiler,
es_index_field=es_index_field,
_query_compiler=_query_compiler,
)
hist = eland.plotting.ed_hist_series
@ -217,16 +218,20 @@ class Series(NDFrame):
13058 JetBeats
Name: Airline, Length: 13059, dtype: object
"""
return Series(query_compiler=self._query_compiler.rename({self.name: new_name}))
return Series(
_query_compiler=self._query_compiler.rename({self.name: new_name})
)
def head(self, n=5):
return Series(query_compiler=self._query_compiler.head(n))
return Series(_query_compiler=self._query_compiler.head(n))
def tail(self, n=5):
return Series(query_compiler=self._query_compiler.tail(n))
return Series(_query_compiler=self._query_compiler.tail(n))
def sample(self, n: int = None, frac: float = None, random_state: int = None):
return Series(query_compiler=self._query_compiler.sample(n, frac, random_state))
return Series(
_query_compiler=self._query_compiler.sample(n, frac, random_state)
)
def value_counts(self, es_size=10):
"""
@ -390,7 +395,7 @@ class Series(NDFrame):
result = _buf.getvalue()
return result
def _to_pandas(self, show_progress=False):
def to_pandas(self, show_progress=False):
return self._query_compiler.to_pandas(show_progress=show_progress)[self.name]
@property
@ -484,13 +489,17 @@ class Series(NDFrame):
"""
return 1
def info_es(self):
def es_info(self):
buf = StringIO()
super()._info_es(buf)
super()._es_info(buf)
return buf.getvalue()
@deprecated_api("eland.Series.es_info()")
def info_es(self):
return self.es_info()
def __add__(self, right):
"""
Return addition of series and right, element-wise (binary operator add).
@ -1081,7 +1090,7 @@ class Series(NDFrame):
left_object.arithmetic_operation(method_name, right_object)
series = Series(
query_compiler=self._query_compiler.arithmetic_op_fields(
_query_compiler=self._query_compiler.arithmetic_op_fields(
display_name, left_object
)
)

View File

@ -24,10 +24,10 @@ from eland.tests import (
_pd_flights = pd.read_json(FLIGHTS_DF_FILE_NAME).sort_index()
_pd_flights["timestamp"] = pd.to_datetime(_pd_flights["timestamp"])
_pd_flights.index = _pd_flights.index.map(str) # make index 'object' not int
_ed_flights = ed.read_es(ES_TEST_CLIENT, FLIGHTS_INDEX_NAME)
_ed_flights = ed.DataFrame(ES_TEST_CLIENT, FLIGHTS_INDEX_NAME)
_pd_flights_small = _pd_flights.head(48)
_ed_flights_small = ed.read_es(ES_TEST_CLIENT, FLIGHTS_SMALL_INDEX_NAME)
_ed_flights_small = ed.DataFrame(ES_TEST_CLIENT, FLIGHTS_SMALL_INDEX_NAME)
_pd_ecommerce = pd.read_json(ECOMMERCE_DF_FILE_NAME).sort_index()
_pd_ecommerce["order_date"] = pd.to_datetime(_pd_ecommerce["order_date"])
@ -37,7 +37,7 @@ _pd_ecommerce["products.created_on"] = _pd_ecommerce["products.created_on"].appl
_pd_ecommerce.insert(2, "customer_birth_date", None)
_pd_ecommerce.index = _pd_ecommerce.index.map(str) # make index 'object' not int
_pd_ecommerce["customer_birth_date"].astype("datetime64")
_ed_ecommerce = ed.read_es(ES_TEST_CLIENT, ECOMMERCE_INDEX_NAME)
_ed_ecommerce = ed.DataFrame(ES_TEST_CLIENT, ECOMMERCE_INDEX_NAME)
class TestData:
@ -68,7 +68,7 @@ def assert_pandas_eland_frame_equal(left, right):
raise AssertionError(f"Expected type ed.DataFrame, found {type(right)} instead")
# Use pandas tests to check similarity
assert_frame_equal(left, right._to_pandas())
assert_frame_equal(left, right.to_pandas())
def assert_eland_frame_equal(left, right):
@ -79,7 +79,7 @@ def assert_eland_frame_equal(left, right):
raise AssertionError(f"Expected type ed.DataFrame, found {type(right)} instead")
# Use pandas tests to check similarity
assert_frame_equal(left._to_pandas(), right._to_pandas())
assert_frame_equal(left.to_pandas(), right.to_pandas())
def assert_pandas_eland_series_equal(left, right, check_less_precise=False):
@ -90,4 +90,4 @@ def assert_pandas_eland_series_equal(left, right, check_less_precise=False):
raise AssertionError(f"Expected type ed.Series, found {type(right)} instead")
# Use pandas tests to check similarity
assert_series_equal(left, right._to_pandas(), check_less_precise=check_less_precise)
assert_series_equal(left, right.to_pandas(), check_less_precise=check_less_precise)

View File

@ -101,7 +101,7 @@ class TestDataFrameDateTime(TestData):
# print(df.to_string())
# print(ed_df.to_string())
# print(ed_df.dtypes)
# print(ed_df._to_pandas().dtypes)
# print(ed_df.to_pandas().dtypes)
assert_series_equal(df.dtypes, ed_df.dtypes)
@ -109,7 +109,7 @@ class TestDataFrameDateTime(TestData):
def test_all_formats(self):
index_name = self.time_index_name
ed_df = ed.read_es(ES_TEST_CLIENT, index_name)
ed_df = ed.DataFrame(ES_TEST_CLIENT, index_name)
for format_name in self.time_formats.keys():
times = [

View File

@ -20,15 +20,15 @@ class TestDataFrameInit:
# Construct invalid DataFrame (throws)
with pytest.raises(ValueError):
ed.DataFrame(client=ES_TEST_CLIENT)
ed.DataFrame(es_client=ES_TEST_CLIENT)
# Construct invalid DataFrame (throws)
with pytest.raises(ValueError):
ed.DataFrame(index_pattern=FLIGHTS_INDEX_NAME)
ed.DataFrame(es_index_pattern=FLIGHTS_INDEX_NAME)
# Good constructors
ed.DataFrame(ES_TEST_CLIENT, FLIGHTS_INDEX_NAME)
ed.DataFrame(client=ES_TEST_CLIENT, index_pattern=FLIGHTS_INDEX_NAME)
ed.DataFrame(es_client=ES_TEST_CLIENT, es_index_pattern=FLIGHTS_INDEX_NAME)
qc = QueryCompiler(client=ES_TEST_CLIENT, index_pattern=FLIGHTS_INDEX_NAME)
ed.DataFrame(query_compiler=qc)
ed.DataFrame(_query_compiler=qc)

View File

@ -7,7 +7,7 @@ import pytest
from pandas.testing import assert_frame_equal
from eland.tests.common import TestData
from eland.utils import eland_to_pandas
from eland import eland_to_pandas
class TestDataFrameSample(TestData):

View File

@ -65,7 +65,7 @@ class TestDataFrameToCSV(TestData):
test_index = FLIGHTS_INDEX_NAME + "." + str(now_millis)
ed_flights_from_csv = ed.read_csv(
ed_flights_from_csv = ed.csv_to_eland(
results_file,
ES_TEST_CLIENT,
test_index,

View File

@ -43,7 +43,7 @@ class TestScriptedFields(TestData):
# note 'None' is printed as 'NaN' in index, but .index shows it is 'None'
buf = StringIO()
ed_field_mappings.info_es(buf)
ed_field_mappings.es_info(buf)
print(buf.getvalue())
expected = self.pd_flights().columns.to_list()

View File

@ -47,7 +47,7 @@ class TestSeriesFrameHist(TestData):
pd_weights = pd.DataFrame({"FlightDelayMin": pd_filteredhist[0]})
d = ed_flights[ed_flights.FlightDelay == True].FlightDelayMin
print(d.info_es())
print(d.es_info())
ed_bins, ed_weights = ed_flights[
ed_flights.FlightDelay == True

View File

@ -12,4 +12,4 @@ class TestSeriesInfoEs(TestData):
ed_flights = self.ed_flights()["AvgTicketPrice"]
# No assertion, just test it can be called
ed_flights.info_es()
ed_flights.es_info()

View File

@ -23,14 +23,14 @@ class TestSeriesRename(TestData):
print(pd_renamed)
print(ed_renamed)
print(ed_renamed.info_es())
print(ed_renamed.es_info())
assert_pandas_eland_series_equal(pd_renamed, ed_renamed)
pd_renamed2 = pd_renamed.rename("renamed2")
ed_renamed2 = ed_renamed.rename("renamed2")
print(ed_renamed2.info_es())
print(ed_renamed2.es_info())
assert "renamed2" == ed_renamed2.name

View File

@ -14,7 +14,7 @@ class TestSeriesSample(TestData):
SEED = 42
def build_from_index(self, ed_series):
ed2pd_series = ed_series._to_pandas()
ed2pd_series = ed_series.to_pandas()
return self.pd_flights()["Carrier"].iloc[ed2pd_series.index]
def test_sample(self):

View File

@ -2,515 +2,24 @@
# Elasticsearch B.V licenses this file to you under the Apache 2.0 License.
# See the LICENSE file in the project root for more information
import csv
from typing import Union, List, Tuple, Optional, Mapping
import pandas as pd
from pandas.io.parsers import _c_parser_defaults
from eland import DataFrame
from eland.field_mappings import FieldMappings
from eland.common import ensure_es_client, DEFAULT_CHUNK_SIZE
from elasticsearch import Elasticsearch
from elasticsearch.helpers import bulk
import functools
import warnings
from typing import Callable, TypeVar
def read_es(
es_client: Union[str, List[str], Tuple[str, ...], Elasticsearch],
es_index_pattern: str,
) -> DataFrame:
"""
Utility method to create an eland.Dataframe from an Elasticsearch index_pattern.
(Similar to pandas.read_csv, but source data is an Elasticsearch index rather than
a csv file)
Parameters
----------
es_client: Elasticsearch client argument(s)
- elasticsearch-py parameters or
- elasticsearch-py instance
es_index_pattern: str
Elasticsearch index pattern
Returns
-------
eland.DataFrame
See Also
--------
eland.pandas_to_eland: Create an eland.Dataframe from pandas.DataFrame
eland.eland_to_pandas: Create a pandas.Dataframe from eland.DataFrame
"""
return DataFrame(client=es_client, index_pattern=es_index_pattern)
F = TypeVar("F")
def pandas_to_eland(
pd_df: pd.DataFrame,
es_client: Union[str, List[str], Tuple[str, ...], Elasticsearch],
es_dest_index: str,
es_if_exists: str = "fail",
es_refresh: bool = False,
es_dropna: bool = False,
es_type_overrides: Optional[Mapping[str, str]] = None,
chunksize: Optional[int] = None,
use_pandas_index_for_es_ids: bool = True,
) -> DataFrame:
"""
Append a pandas DataFrame to an Elasticsearch index.
Mainly used in testing.
Modifies the elasticsearch destination index
Parameters
----------
es_client: Elasticsearch client argument(s)
- elasticsearch-py parameters or
- elasticsearch-py instance
es_dest_index: str
Name of Elasticsearch index to be appended to
es_if_exists : {'fail', 'replace', 'append'}, default 'fail'
How to behave if the index already exists.
- fail: Raise a ValueError.
- replace: Delete the index before inserting new values.
- append: Insert new values to the existing index. Create if does not exist.
es_refresh: bool, default 'False'
Refresh es_dest_index after bulk index
es_dropna: bool, default 'False'
* True: Remove missing values (see pandas.Series.dropna)
* False: Include missing values - may cause bulk to fail
es_type_overrides: dict, default None
Dict of field_name: es_data_type that overrides default es data types
chunksize: int, default None
Number of pandas.DataFrame rows to read before bulk index into Elasticsearch
use_pandas_index_for_es_ids: bool, default 'True'
* True: pandas.DataFrame.index fields will be used to populate Elasticsearch '_id' fields.
* False: Ignore pandas.DataFrame.index when indexing into Elasticsearch
Returns
-------
eland.Dataframe
eland.DataFrame referencing data in destination_index
Examples
--------
>>> pd_df = pd.DataFrame(data={'A': 3.141,
... 'B': 1,
... 'C': 'foo',
... 'D': pd.Timestamp('20190102'),
... 'E': [1.0, 2.0, 3.0],
... 'F': False,
... 'G': [1, 2, 3],
... 'H': 'Long text - to be indexed as es type text'},
... index=['0', '1', '2'])
>>> type(pd_df)
<class 'pandas.core.frame.DataFrame'>
>>> pd_df
A B ... G H
0 3.141 1 ... 1 Long text - to be indexed as es type text
1 3.141 1 ... 2 Long text - to be indexed as es type text
2 3.141 1 ... 3 Long text - to be indexed as es type text
<BLANKLINE>
[3 rows x 8 columns]
>>> pd_df.dtypes
A float64
B int64
C object
D datetime64[ns]
E float64
F bool
G int64
H object
dtype: object
Convert `pandas.DataFrame` to `eland.DataFrame` - this creates an Elasticsearch index called `pandas_to_eland`.
Overwrite existing Elasticsearch index if it exists `if_exists="replace"`, and sync index so it is
readable on return `refresh=True`
>>> ed_df = ed.pandas_to_eland(pd_df,
... 'localhost',
... 'pandas_to_eland',
... es_if_exists="replace",
... es_refresh=True,
... es_type_overrides={'H':'text'}) # index field 'H' as text not keyword
>>> type(ed_df)
<class 'eland.dataframe.DataFrame'>
>>> ed_df
A B ... G H
0 3.141 1 ... 1 Long text - to be indexed as es type text
1 3.141 1 ... 2 Long text - to be indexed as es type text
2 3.141 1 ... 3 Long text - to be indexed as es type text
<BLANKLINE>
[3 rows x 8 columns]
>>> ed_df.dtypes
A float64
B int64
C object
D datetime64[ns]
E float64
F bool
G int64
H object
dtype: object
See Also
--------
eland.read_es: Create an eland.Dataframe from an Elasticsearch index
eland.eland_to_pandas: Create a pandas.Dataframe from eland.DataFrame
"""
if chunksize is None:
chunksize = DEFAULT_CHUNK_SIZE
mapping = FieldMappings._generate_es_mappings(pd_df, es_type_overrides)
es_client = ensure_es_client(es_client)
# If table exists, check if_exists parameter
if es_client.indices.exists(index=es_dest_index):
if es_if_exists == "fail":
raise ValueError(
f"Could not create the index [{es_dest_index}] because it "
f"already exists. "
f"Change the if_exists parameter to "
f"'append' or 'replace' data."
def deprecated_api(replace_with: str) -> Callable[[F], F]:
def wrapper(f: F) -> F:
@functools.wraps(f)
def wrapped(*args, **kwargs):
warnings.warn(
f"{f.__name__} is deprecated, use {replace_with} instead",
DeprecationWarning,
)
elif es_if_exists == "replace":
es_client.indices.delete(index=es_dest_index)
es_client.indices.create(index=es_dest_index, body=mapping)
# elif if_exists == "append":
# TODO validate mapping are compatible
else:
es_client.indices.create(index=es_dest_index, body=mapping)
return f(*args, **kwargs)
# Now add data
actions = []
n = 0
for row in pd_df.iterrows():
if es_dropna:
values = row[1].dropna().to_dict()
else:
values = row[1].to_dict()
return wrapped
if use_pandas_index_for_es_ids:
# Use index as _id
id = row[0]
# Use integer as id field for repeatable results
action = {"_index": es_dest_index, "_source": values, "_id": str(id)}
else:
action = {"_index": es_dest_index, "_source": values}
actions.append(action)
n = n + 1
if n % chunksize == 0:
bulk(client=es_client, actions=actions, refresh=es_refresh)
actions = []
bulk(client=es_client, actions=actions, refresh=es_refresh)
return DataFrame(es_client, es_dest_index)
def eland_to_pandas(ed_df: DataFrame, show_progress: bool = False) -> pd.DataFrame:
"""
Convert an eland.Dataframe to a pandas.DataFrame
**Note: this loads the entire Elasticsearch index into in core pandas.DataFrame structures. For large
indices this can create significant load on the Elasticsearch cluster and require signficant memory**
Parameters
----------
ed_df: eland.DataFrame
The source eland.Dataframe referencing the Elasticsearch index
show_progress: bool
Output progress of option to stdout? By default False.
Returns
-------
pandas.Dataframe
pandas.DataFrame contains all rows and columns in eland.DataFrame
Examples
--------
>>> ed_df = ed.DataFrame('localhost', 'flights').head()
>>> type(ed_df)
<class 'eland.dataframe.DataFrame'>
>>> ed_df
AvgTicketPrice Cancelled ... dayOfWeek timestamp
0 841.265642 False ... 0 2018-01-01 00:00:00
1 882.982662 False ... 0 2018-01-01 18:27:00
2 190.636904 False ... 0 2018-01-01 17:11:14
3 181.694216 True ... 0 2018-01-01 10:33:28
4 730.041778 False ... 0 2018-01-01 05:13:00
<BLANKLINE>
[5 rows x 27 columns]
Convert `eland.DataFrame` to `pandas.DataFrame` (Note: this loads entire Elasticsearch index into core memory)
>>> pd_df = ed.eland_to_pandas(ed_df)
>>> type(pd_df)
<class 'pandas.core.frame.DataFrame'>
>>> pd_df
AvgTicketPrice Cancelled ... dayOfWeek timestamp
0 841.265642 False ... 0 2018-01-01 00:00:00
1 882.982662 False ... 0 2018-01-01 18:27:00
2 190.636904 False ... 0 2018-01-01 17:11:14
3 181.694216 True ... 0 2018-01-01 10:33:28
4 730.041778 False ... 0 2018-01-01 05:13:00
<BLANKLINE>
[5 rows x 27 columns]
Convert `eland.DataFrame` to `pandas.DataFrame` and show progress every 10000 rows
>>> pd_df = ed.eland_to_pandas(ed.DataFrame('localhost', 'flights'), show_progress=True) # doctest: +SKIP
2020-01-29 12:43:36.572395: read 10000 rows
2020-01-29 12:43:37.309031: read 13059 rows
See Also
--------
eland.read_es: Create an eland.Dataframe from an Elasticsearch index
eland.pandas_to_eland: Create an eland.Dataframe from pandas.DataFrame
"""
return ed_df._to_pandas(show_progress=show_progress)
def read_csv(
filepath_or_buffer,
es_client,
es_dest_index,
es_if_exists="fail",
es_refresh=False,
es_dropna=False,
es_type_overrides=None,
sep=",",
delimiter=None,
# Column and Index Locations and Names
header="infer",
names=None,
index_col=None,
usecols=None,
squeeze=False,
prefix=None,
mangle_dupe_cols=True,
# General Parsing Configuration
dtype=None,
engine=None,
converters=None,
true_values=None,
false_values=None,
skipinitialspace=False,
skiprows=None,
skipfooter=0,
nrows=None,
# Iteration
# iterator=False,
chunksize=None,
# NA and Missing Data Handling
na_values=None,
keep_default_na=True,
na_filter=True,
verbose=False,
skip_blank_lines=True,
# Datetime Handling
parse_dates=False,
infer_datetime_format=False,
keep_date_col=False,
date_parser=None,
dayfirst=False,
cache_dates=True,
# Quoting, Compression, and File Format
compression="infer",
thousands=None,
decimal=b".",
lineterminator=None,
quotechar='"',
quoting=csv.QUOTE_MINIMAL,
doublequote=True,
escapechar=None,
comment=None,
encoding=None,
dialect=None,
# Error Handling
error_bad_lines=True,
warn_bad_lines=True,
# Internal
delim_whitespace=False,
low_memory=_c_parser_defaults["low_memory"],
memory_map=False,
float_precision=None,
):
"""
Read a comma-separated values (csv) file into eland.DataFrame (i.e. an Elasticsearch index).
**Modifies an Elasticsearch index**
**Note pandas iteration options not supported**
Parameters
----------
es_client: Elasticsearch client argument(s)
- elasticsearch-py parameters or
- elasticsearch-py instance
es_dest_index: str
Name of Elasticsearch index to be appended to
es_if_exists : {'fail', 'replace', 'append'}, default 'fail'
How to behave if the index already exists.
- fail: Raise a ValueError.
- replace: Delete the index before inserting new values.
- append: Insert new values to the existing index. Create if does not exist.
es_dropna: bool, default 'False'
* True: Remove missing values (see pandas.Series.dropna)
* False: Include missing values - may cause bulk to fail
es_type_overrides: dict, default None
Dict of columns: es_type to override default es datatype mappings
chunksize
number of csv rows to read before bulk index into Elasticsearch
Other Parameters
----------------
Parameters derived from :pandas_api_docs:`pandas.read_csv`.
See Also
--------
:pandas_api_docs:`pandas.read_csv`
Notes
-----
iterator not supported
Examples
--------
See if 'churn' index exists in Elasticsearch
>>> from elasticsearch import Elasticsearch # doctest: +SKIP
>>> es = Elasticsearch() # doctest: +SKIP
>>> es.indices.exists(index="churn") # doctest: +SKIP
False
Read 'churn.csv' and use first column as _id (and eland.DataFrame index)
::
# churn.csv
,state,account length,area code,phone number,international plan,voice mail plan,number vmail messages,total day minutes,total day calls,total day charge,total eve minutes,total eve calls,total eve charge,total night minutes,total night calls,total night charge,total intl minutes,total intl calls,total intl charge,customer service calls,churn
0,KS,128,415,382-4657,no,yes,25,265.1,110,45.07,197.4,99,16.78,244.7,91,11.01,10.0,3,2.7,1,0
1,OH,107,415,371-7191,no,yes,26,161.6,123,27.47,195.5,103,16.62,254.4,103,11.45,13.7,3,3.7,1,0
...
>>> ed.read_csv("churn.csv",
... es_client='localhost',
... es_dest_index='churn',
... es_refresh=True,
... index_col=0) # doctest: +SKIP
account length area code churn customer service calls ... total night calls total night charge total night minutes voice mail plan
0 128 415 0 1 ... 91 11.01 244.7 yes
1 107 415 0 1 ... 103 11.45 254.4 yes
2 137 415 0 0 ... 104 7.32 162.6 no
3 84 408 0 2 ... 89 8.86 196.9 no
4 75 415 0 3 ... 121 8.41 186.9 no
... ... ... ... ... ... ... ... ... ...
3328 192 415 0 2 ... 83 12.56 279.1 yes
3329 68 415 0 3 ... 123 8.61 191.3 no
3330 28 510 0 2 ... 91 8.64 191.9 no
3331 184 510 0 2 ... 137 6.26 139.2 no
3332 74 415 0 0 ... 77 10.86 241.4 yes
<BLANKLINE>
[3333 rows x 21 columns]
Validate data now exists in 'churn' index:
>>> es.search(index="churn", size=1) # doctest: +SKIP
{'took': 1, 'timed_out': False, '_shards': {'total': 1, 'successful': 1, 'skipped': 0, 'failed': 0}, 'hits': {'total': {'value': 3333, 'relation': 'eq'}, 'max_score': 1.0, 'hits': [{'_index': 'churn', '_id': '0', '_score': 1.0, '_source': {'state': 'KS', 'account length': 128, 'area code': 415, 'phone number': '382-4657', 'international plan': 'no', 'voice mail plan': 'yes', 'number vmail messages': 25, 'total day minutes': 265.1, 'total day calls': 110, 'total day charge': 45.07, 'total eve minutes': 197.4, 'total eve calls': 99, 'total eve charge': 16.78, 'total night minutes': 244.7, 'total night calls': 91, 'total night charge': 11.01, 'total intl minutes': 10.0, 'total intl calls': 3, 'total intl charge': 2.7, 'customer service calls': 1, 'churn': 0}}]}}
TODO - currently the eland.DataFrame may not retain the order of the data in the csv.
"""
kwds = dict()
kwds.update(
sep=sep,
delimiter=delimiter,
engine=engine,
dialect=dialect,
compression=compression,
# engine_specified=engine_specified,
doublequote=doublequote,
escapechar=escapechar,
quotechar=quotechar,
quoting=quoting,
skipinitialspace=skipinitialspace,
lineterminator=lineterminator,
header=header,
index_col=index_col,
names=names,
prefix=prefix,
skiprows=skiprows,
skipfooter=skipfooter,
na_values=na_values,
true_values=true_values,
false_values=false_values,
keep_default_na=keep_default_na,
thousands=thousands,
comment=comment,
decimal=decimal,
parse_dates=parse_dates,
keep_date_col=keep_date_col,
dayfirst=dayfirst,
date_parser=date_parser,
cache_dates=cache_dates,
nrows=nrows,
# iterator=iterator,
chunksize=chunksize,
converters=converters,
dtype=dtype,
usecols=usecols,
verbose=verbose,
encoding=encoding,
squeeze=squeeze,
memory_map=memory_map,
float_precision=float_precision,
na_filter=na_filter,
delim_whitespace=delim_whitespace,
warn_bad_lines=warn_bad_lines,
error_bad_lines=error_bad_lines,
low_memory=low_memory,
mangle_dupe_cols=mangle_dupe_cols,
infer_datetime_format=infer_datetime_format,
skip_blank_lines=skip_blank_lines,
)
if chunksize is None:
kwds.update(chunksize=DEFAULT_CHUNK_SIZE)
# read csv in chunks to pandas DataFrame and dump to eland DataFrame (and Elasticsearch)
reader = pd.read_csv(filepath_or_buffer, **kwds)
first_write = True
for chunk in reader:
if first_write:
pandas_to_eland(
chunk,
es_client,
es_dest_index,
es_if_exists=es_if_exists,
chunksize=chunksize,
es_refresh=es_refresh,
es_dropna=es_dropna,
es_type_overrides=es_type_overrides,
)
first_write = False
else:
pandas_to_eland(
chunk,
es_client,
es_dest_index,
es_if_exists="append",
chunksize=chunksize,
es_refresh=es_refresh,
es_dropna=es_dropna,
es_type_overrides=es_type_overrides,
)
# Now create an eland.DataFrame that references the new index
return DataFrame(es_client, es_dest_index)
return wrapper

View File

@ -25,6 +25,7 @@ TYPED_FILES = {
"eland/actions.py",
"eland/arithmetics.py",
"eland/common.py",
"eland/etl.py",
"eland/filter.py",
"eland/index.py",
"eland/query.py",
@ -106,7 +107,7 @@ def docs(session):
session.run("pandoc", "--version", external=True)
session.install("-r", "docs/requirements-docs.txt")
session.run("python", "setup.py", "install")
session.install(".")
# See if we have an Elasticsearch cluster active
# to rebuild the Jupyter notebooks with.