Merge pull request #46 from stevedodson/master

Adding eland.read_csv
This commit is contained in:
stevedodson 2019-11-15 16:16:26 +01:00 committed by GitHub
commit 905fba5d0f
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 250 additions and 34 deletions

View File

@ -26,5 +26,7 @@ In general, the data resides in elasticsearch and not in memory, which allows el
* :doc:`reference/index`
* :doc:`reference/io`
* :doc:`reference/general_utility_functions`
* :doc:`reference/dataframe`
* :doc:`reference/index`

View File

@ -0,0 +1,6 @@
eland.read_csv
==============
.. currentmodule:: eland
.. autofunction:: read_csv

View File

@ -10,6 +10,7 @@ methods. All classes and functions exposed in ``eland.*`` namespace are public.
.. toctree::
:maxdepth: 2
io
general_utility_functions
dataframe
indexing

View File

@ -9,6 +9,8 @@ Index
that contain an index (Series/DataFrame) and those should most likely be
used before calling these methods directly.**
Constructor
~~~~~~~~~~~
.. autosummary::
:toctree: api/

View File

@ -0,0 +1,13 @@
.. _api.io:
============
Input/Output
============
.. currentmodule:: eland
Flat File
~~~~~~~~~
.. autosummary::
:toctree: api/
read_csv

View File

@ -431,4 +431,3 @@ class ElandQueryCompiler:
return result
# def isna(self):

View File

@ -1,13 +1,23 @@
# File called _pytest for PyCharm compatability
import ast
import time
import eland as ed
from elasticsearch import Elasticsearch
import pandas as pd
from pandas.util.testing import (assert_frame_equal)
from pandas.util.testing import assert_frame_equal
from eland.tests.common import ROOT_DIR
from eland.tests.common import TestData
from eland.tests import ELASTICSEARCH_HOST
from eland.tests import FLIGHTS_INDEX_NAME
from eland.tests.common import assert_pandas_eland_frame_equal
class TestDataFrameToCSV(TestData):
@ -42,3 +52,21 @@ class TestDataFrameToCSV(TestData):
pd_from_csv.timestamp = pd.to_datetime(pd_from_csv.timestamp)
assert_frame_equal(pd_flights, pd_from_csv)
# Now read the csv to an index
now_millis = int(round(time.time() * 1000))
test_index = FLIGHTS_INDEX_NAME + '.' + str(now_millis)
es = Elasticsearch(ELASTICSEARCH_HOST)
ed_flights_from_csv = ed.read_csv(results_file, es, test_index, index_col=0, es_refresh=True,
es_geo_points=['OriginLocation', 'DestLocation'],
converters={
'DestLocation': lambda x: ast.literal_eval(x),
'OriginLocation': lambda x: ast.literal_eval(x)}
)
pd_flights_from_csv = ed.eland_to_pandas(ed_flights_from_csv)
# TODO - there is a 'bug' where the Elasticsearch index returns data in a different order to the CSV
print(ed_flights_from_csv.head())
print(pd_flights_from_csv.head())

View File

@ -1,8 +1,14 @@
import pandas as pd
import csv
from pandas.io.parsers import _c_parser_defaults
from eland import Client
from eland import DataFrame
from eland import Mappings
import pandas as pd
_default_chunk_size = 10000
def read_es(es_params, index_pattern):
@ -31,7 +37,10 @@ def read_es(es_params, index_pattern):
"""
return DataFrame(client=es_params, index_pattern=index_pattern)
def pandas_to_eland(pd_df, es_params, destination_index, if_exists='fail', chunk_size=10000, refresh=False, dropna=False,
def pandas_to_eland(pd_df, es_params, destination_index, if_exists='fail', chunksize=None,
refresh=False,
dropna=False,
geo_points=None):
"""
Append a pandas DataFrame to an Elasticsearch index.
@ -52,6 +61,8 @@ def pandas_to_eland(pd_df, es_params, destination_index, if_exists='fail', chunk
- fail: Raise a ValueError.
- replace: Delete the index before inserting new values.
- append: Insert new values to the existing index. Create if does not exist.
refresh: bool, default 'False'
Refresh destination_index after bulk index
dropna: bool, default 'False'
* True: Remove missing values (see pandas.Series.dropna)
* False: Include missing values - may cause bulk to fail
@ -68,6 +79,9 @@ def pandas_to_eland(pd_df, es_params, destination_index, if_exists='fail', chunk
eland.read_es: Create an eland.Dataframe from an Elasticsearch index
eland.eland_to_pandas: Create a pandas.Dataframe from eland.DataFrame
"""
if chunksize is None:
chunksize = _default_chunk_size
client = Client(es_params)
mapping = Mappings._generate_es_mappings(pd_df, geo_points)
@ -108,7 +122,7 @@ def pandas_to_eland(pd_df, es_params, destination_index, if_exists='fail', chunk
n = n + 1
if n % chunk_size == 0:
if n % chunksize == 0:
client.bulk(actions, refresh=refresh)
actions = []
@ -118,6 +132,7 @@ def pandas_to_eland(pd_df, es_params, destination_index, if_exists='fail', chunk
return ed_df
def eland_to_pandas(ed_df):
"""
Convert an eland.Dataframe to a pandas.DataFrame
@ -142,36 +157,186 @@ def eland_to_pandas(ed_df):
"""
return ed_df._to_pandas()
def _inherit_docstrings(parent, excluded=[]):
"""Creates a decorator which overwrites a decorated class' __doc__
attribute with parent's __doc__ attribute. Also overwrites __doc__ of
methods and properties defined in the class with the __doc__ of matching
methods and properties in parent.
Args:
parent (object): Class from which the decorated class inherits __doc__.
excluded (list): List of parent objects from which the class does not
inherit docstrings.
Returns:
function: decorator which replaces the decorated class' documentation
parent's documentation.
def read_csv(filepath_or_buffer,
es_client,
es_dest_index,
es_if_exists='fail',
es_refresh=False,
es_dropna=False,
es_geo_points=None,
sep=",",
delimiter=None,
# Column and Index Locations and Names
header="infer",
names=None,
index_col=None,
usecols=None,
squeeze=False,
prefix=None,
mangle_dupe_cols=True,
# General Parsing Configuration
dtype=None,
engine=None,
converters=None,
true_values=None,
false_values=None,
skipinitialspace=False,
skiprows=None,
skipfooter=0,
nrows=None,
# Iteration
# iterator=False,
chunksize=None,
# NA and Missing Data Handling
na_values=None,
keep_default_na=True,
na_filter=True,
verbose=False,
skip_blank_lines=True,
# Datetime Handling
parse_dates=False,
infer_datetime_format=False,
keep_date_col=False,
date_parser=None,
dayfirst=False,
cache_dates=True,
# Quoting, Compression, and File Format
compression="infer",
thousands=None,
decimal=b".",
lineterminator=None,
quotechar='"',
quoting=csv.QUOTE_MINIMAL,
doublequote=True,
escapechar=None,
comment=None,
encoding=None,
dialect=None,
# Error Handling
error_bad_lines=True,
warn_bad_lines=True,
# Internal
delim_whitespace=False,
low_memory=_c_parser_defaults["low_memory"],
memory_map=False,
float_precision=None):
"""
Read a comma-separated values (csv) file into eland.DataFrame (i.e. an Elasticsearch index).
def decorator(cls):
if parent not in excluded:
cls.__doc__ = parent.__doc__
for attr, obj in cls.__dict__.items():
parent_obj = getattr(parent, attr, None)
if parent_obj in excluded or (
not callable(parent_obj) and not isinstance(parent_obj, property)
):
continue
if callable(obj):
obj.__doc__ = parent_obj.__doc__
elif isinstance(obj, property) and obj.fget is not None:
p = property(obj.fget, obj.fset, obj.fdel, parent_obj.__doc__)
setattr(cls, attr, p)
return cls
**Modifies an Elasticsearch index**
**Note iteration not supported**
Parameters
----------
es_params: Elasticsearch client argument(s)
- elasticsearch-py parameters or
- elasticsearch-py instance or
- eland.Client instance
es_dest_index: str
Name of Elasticsearch index to be appended to
es_if_exists : {'fail', 'replace', 'append'}, default 'fail'
How to behave if the index already exists.
- fail: Raise a ValueError.
- replace: Delete the index before inserting new values.
- append: Insert new values to the existing index. Create if does not exist.
es_dropna: bool, default 'False'
* True: Remove missing values (see pandas.Series.dropna)
* False: Include missing values - may cause bulk to fail
es_geo_points: list, default None
List of columns to map to geo_point data type
iterator
ignored
chunksize
number of csv rows to read before bulk index into Elasticsearch
Other Parameters
----------------
Parameters derived from :pandas_api_docs:`read_csv`.
See Also
--------
:pandas_api_docs:`read_csv` - for all parameters
Notes
-----
TODO - currently the eland.DataFrame may not retain the order of the data in the csv.
"""
kwds = dict()
kwds.update(
delimiter=delimiter,
engine=engine,
dialect=dialect,
compression=compression,
# engine_specified=engine_specified,
doublequote=doublequote,
escapechar=escapechar,
quotechar=quotechar,
quoting=quoting,
skipinitialspace=skipinitialspace,
lineterminator=lineterminator,
header=header,
index_col=index_col,
names=names,
prefix=prefix,
skiprows=skiprows,
skipfooter=skipfooter,
na_values=na_values,
true_values=true_values,
false_values=false_values,
keep_default_na=keep_default_na,
thousands=thousands,
comment=comment,
decimal=decimal,
parse_dates=parse_dates,
keep_date_col=keep_date_col,
dayfirst=dayfirst,
date_parser=date_parser,
cache_dates=cache_dates,
nrows=nrows,
# iterator=iterator,
chunksize=chunksize,
converters=converters,
dtype=dtype,
usecols=usecols,
verbose=verbose,
encoding=encoding,
squeeze=squeeze,
memory_map=memory_map,
float_precision=float_precision,
na_filter=na_filter,
delim_whitespace=delim_whitespace,
warn_bad_lines=warn_bad_lines,
error_bad_lines=error_bad_lines,
low_memory=low_memory,
mangle_dupe_cols=mangle_dupe_cols,
infer_datetime_format=infer_datetime_format,
skip_blank_lines=skip_blank_lines,
)
if chunksize is None:
kwds.update(chunksize=_default_chunk_size)
client = Client(es_client)
# read csv in chunks to pandas DataFrame and dump to eland DataFrame (and Elasticsearch)
reader = pd.read_csv(filepath_or_buffer, **kwds)
first_write = True
for chunk in reader:
if first_write:
pandas_to_eland(chunk, client, es_dest_index, if_exists=es_if_exists, chunksize=chunksize,
refresh=es_refresh, dropna=es_dropna, geo_points=es_geo_points)
first_write = False
else:
pandas_to_eland(chunk, client, es_dest_index, if_exists='append', chunksize=chunksize,
refresh=es_refresh, dropna=es_dropna, geo_points=es_geo_points)
# Now create an eland.DataFrame that references the new index
ed_df = DataFrame(client, es_dest_index)
return ed_df
return decorator