diff --git a/docs/source/index.rst b/docs/source/index.rst index 871dd61..bacd12b 100644 --- a/docs/source/index.rst +++ b/docs/source/index.rst @@ -26,5 +26,7 @@ In general, the data resides in elasticsearch and not in memory, which allows el * :doc:`reference/index` + * :doc:`reference/io` * :doc:`reference/general_utility_functions` * :doc:`reference/dataframe` + * :doc:`reference/index` diff --git a/docs/source/reference/api/eland.read_csv.rst b/docs/source/reference/api/eland.read_csv.rst new file mode 100644 index 0000000..c43cdec --- /dev/null +++ b/docs/source/reference/api/eland.read_csv.rst @@ -0,0 +1,6 @@ +eland.read_csv +============== + +.. currentmodule:: eland + +.. autofunction:: read_csv diff --git a/docs/source/reference/index.rst b/docs/source/reference/index.rst index a623800..b7c6bf4 100644 --- a/docs/source/reference/index.rst +++ b/docs/source/reference/index.rst @@ -10,6 +10,7 @@ methods. All classes and functions exposed in ``eland.*`` namespace are public. .. toctree:: :maxdepth: 2 + io general_utility_functions dataframe indexing diff --git a/docs/source/reference/indexing.rst b/docs/source/reference/indexing.rst index 1824209..9e4145f 100644 --- a/docs/source/reference/indexing.rst +++ b/docs/source/reference/indexing.rst @@ -9,6 +9,8 @@ Index that contain an index (Series/DataFrame) and those should most likely be used before calling these methods directly.** +Constructor +~~~~~~~~~~~ .. autosummary:: :toctree: api/ diff --git a/docs/source/reference/io.rst b/docs/source/reference/io.rst new file mode 100644 index 0000000..1d16d9c --- /dev/null +++ b/docs/source/reference/io.rst @@ -0,0 +1,13 @@ +.. _api.io: + +============ +Input/Output +============ +.. currentmodule:: eland + +Flat File +~~~~~~~~~ +.. autosummary:: + :toctree: api/ + + read_csv diff --git a/eland/query_compiler.py b/eland/query_compiler.py index ba6423a..1d24c5a 100644 --- a/eland/query_compiler.py +++ b/eland/query_compiler.py @@ -431,4 +431,3 @@ class ElandQueryCompiler: return result - # def isna(self): diff --git a/eland/tests/dataframe/test_to_csv_pytest.py b/eland/tests/dataframe/test_to_csv_pytest.py index 169bd19..27f7ba0 100644 --- a/eland/tests/dataframe/test_to_csv_pytest.py +++ b/eland/tests/dataframe/test_to_csv_pytest.py @@ -1,13 +1,23 @@ # File called _pytest for PyCharm compatability import ast +import time + +import eland as ed + +from elasticsearch import Elasticsearch import pandas as pd -from pandas.util.testing import (assert_frame_equal) +from pandas.util.testing import assert_frame_equal from eland.tests.common import ROOT_DIR from eland.tests.common import TestData +from eland.tests import ELASTICSEARCH_HOST +from eland.tests import FLIGHTS_INDEX_NAME + +from eland.tests.common import assert_pandas_eland_frame_equal + class TestDataFrameToCSV(TestData): @@ -42,3 +52,21 @@ class TestDataFrameToCSV(TestData): pd_from_csv.timestamp = pd.to_datetime(pd_from_csv.timestamp) assert_frame_equal(pd_flights, pd_from_csv) + + # Now read the csv to an index + now_millis = int(round(time.time() * 1000)) + + test_index = FLIGHTS_INDEX_NAME + '.' + str(now_millis) + es = Elasticsearch(ELASTICSEARCH_HOST) + + ed_flights_from_csv = ed.read_csv(results_file, es, test_index, index_col=0, es_refresh=True, + es_geo_points=['OriginLocation', 'DestLocation'], + converters={ + 'DestLocation': lambda x: ast.literal_eval(x), + 'OriginLocation': lambda x: ast.literal_eval(x)} + ) + pd_flights_from_csv = ed.eland_to_pandas(ed_flights_from_csv) + + # TODO - there is a 'bug' where the Elasticsearch index returns data in a different order to the CSV + print(ed_flights_from_csv.head()) + print(pd_flights_from_csv.head()) diff --git a/eland/utils.py b/eland/utils.py index 463e8d4..962588e 100644 --- a/eland/utils.py +++ b/eland/utils.py @@ -1,8 +1,14 @@ +import pandas as pd +import csv + +from pandas.io.parsers import _c_parser_defaults + + from eland import Client from eland import DataFrame from eland import Mappings -import pandas as pd +_default_chunk_size = 10000 def read_es(es_params, index_pattern): @@ -31,7 +37,10 @@ def read_es(es_params, index_pattern): """ return DataFrame(client=es_params, index_pattern=index_pattern) -def pandas_to_eland(pd_df, es_params, destination_index, if_exists='fail', chunk_size=10000, refresh=False, dropna=False, + +def pandas_to_eland(pd_df, es_params, destination_index, if_exists='fail', chunksize=None, + refresh=False, + dropna=False, geo_points=None): """ Append a pandas DataFrame to an Elasticsearch index. @@ -52,6 +61,8 @@ def pandas_to_eland(pd_df, es_params, destination_index, if_exists='fail', chunk - fail: Raise a ValueError. - replace: Delete the index before inserting new values. - append: Insert new values to the existing index. Create if does not exist. + refresh: bool, default 'False' + Refresh destination_index after bulk index dropna: bool, default 'False' * True: Remove missing values (see pandas.Series.dropna) * False: Include missing values - may cause bulk to fail @@ -68,6 +79,9 @@ def pandas_to_eland(pd_df, es_params, destination_index, if_exists='fail', chunk eland.read_es: Create an eland.Dataframe from an Elasticsearch index eland.eland_to_pandas: Create a pandas.Dataframe from eland.DataFrame """ + if chunksize is None: + chunksize = _default_chunk_size + client = Client(es_params) mapping = Mappings._generate_es_mappings(pd_df, geo_points) @@ -108,7 +122,7 @@ def pandas_to_eland(pd_df, es_params, destination_index, if_exists='fail', chunk n = n + 1 - if n % chunk_size == 0: + if n % chunksize == 0: client.bulk(actions, refresh=refresh) actions = [] @@ -118,6 +132,7 @@ def pandas_to_eland(pd_df, es_params, destination_index, if_exists='fail', chunk return ed_df + def eland_to_pandas(ed_df): """ Convert an eland.Dataframe to a pandas.DataFrame @@ -142,36 +157,186 @@ def eland_to_pandas(ed_df): """ return ed_df._to_pandas() -def _inherit_docstrings(parent, excluded=[]): - """Creates a decorator which overwrites a decorated class' __doc__ - attribute with parent's __doc__ attribute. Also overwrites __doc__ of - methods and properties defined in the class with the __doc__ of matching - methods and properties in parent. - Args: - parent (object): Class from which the decorated class inherits __doc__. - excluded (list): List of parent objects from which the class does not - inherit docstrings. - - Returns: - function: decorator which replaces the decorated class' documentation - parent's documentation. +def read_csv(filepath_or_buffer, + es_client, + es_dest_index, + es_if_exists='fail', + es_refresh=False, + es_dropna=False, + es_geo_points=None, + sep=",", + delimiter=None, + # Column and Index Locations and Names + header="infer", + names=None, + index_col=None, + usecols=None, + squeeze=False, + prefix=None, + mangle_dupe_cols=True, + # General Parsing Configuration + dtype=None, + engine=None, + converters=None, + true_values=None, + false_values=None, + skipinitialspace=False, + skiprows=None, + skipfooter=0, + nrows=None, + # Iteration + # iterator=False, + chunksize=None, + # NA and Missing Data Handling + na_values=None, + keep_default_na=True, + na_filter=True, + verbose=False, + skip_blank_lines=True, + # Datetime Handling + parse_dates=False, + infer_datetime_format=False, + keep_date_col=False, + date_parser=None, + dayfirst=False, + cache_dates=True, + # Quoting, Compression, and File Format + compression="infer", + thousands=None, + decimal=b".", + lineterminator=None, + quotechar='"', + quoting=csv.QUOTE_MINIMAL, + doublequote=True, + escapechar=None, + comment=None, + encoding=None, + dialect=None, + # Error Handling + error_bad_lines=True, + warn_bad_lines=True, + # Internal + delim_whitespace=False, + low_memory=_c_parser_defaults["low_memory"], + memory_map=False, + float_precision=None): """ + Read a comma-separated values (csv) file into eland.DataFrame (i.e. an Elasticsearch index). - def decorator(cls): - if parent not in excluded: - cls.__doc__ = parent.__doc__ - for attr, obj in cls.__dict__.items(): - parent_obj = getattr(parent, attr, None) - if parent_obj in excluded or ( - not callable(parent_obj) and not isinstance(parent_obj, property) - ): - continue - if callable(obj): - obj.__doc__ = parent_obj.__doc__ - elif isinstance(obj, property) and obj.fget is not None: - p = property(obj.fget, obj.fset, obj.fdel, parent_obj.__doc__) - setattr(cls, attr, p) - return cls + **Modifies an Elasticsearch index** + + **Note iteration not supported** + + Parameters + ---------- + es_params: Elasticsearch client argument(s) + - elasticsearch-py parameters or + - elasticsearch-py instance or + - eland.Client instance + es_dest_index: str + Name of Elasticsearch index to be appended to + es_if_exists : {'fail', 'replace', 'append'}, default 'fail' + How to behave if the index already exists. + + - fail: Raise a ValueError. + - replace: Delete the index before inserting new values. + - append: Insert new values to the existing index. Create if does not exist. + es_dropna: bool, default 'False' + * True: Remove missing values (see pandas.Series.dropna) + * False: Include missing values - may cause bulk to fail + es_geo_points: list, default None + List of columns to map to geo_point data type + iterator + ignored + chunksize + number of csv rows to read before bulk index into Elasticsearch + + Other Parameters + ---------------- + Parameters derived from :pandas_api_docs:`read_csv`. + + See Also + -------- + :pandas_api_docs:`read_csv` - for all parameters + + Notes + ----- + TODO - currently the eland.DataFrame may not retain the order of the data in the csv. + """ + kwds = dict() + + kwds.update( + delimiter=delimiter, + engine=engine, + dialect=dialect, + compression=compression, + # engine_specified=engine_specified, + doublequote=doublequote, + escapechar=escapechar, + quotechar=quotechar, + quoting=quoting, + skipinitialspace=skipinitialspace, + lineterminator=lineterminator, + header=header, + index_col=index_col, + names=names, + prefix=prefix, + skiprows=skiprows, + skipfooter=skipfooter, + na_values=na_values, + true_values=true_values, + false_values=false_values, + keep_default_na=keep_default_na, + thousands=thousands, + comment=comment, + decimal=decimal, + parse_dates=parse_dates, + keep_date_col=keep_date_col, + dayfirst=dayfirst, + date_parser=date_parser, + cache_dates=cache_dates, + nrows=nrows, + # iterator=iterator, + chunksize=chunksize, + converters=converters, + dtype=dtype, + usecols=usecols, + verbose=verbose, + encoding=encoding, + squeeze=squeeze, + memory_map=memory_map, + float_precision=float_precision, + na_filter=na_filter, + delim_whitespace=delim_whitespace, + warn_bad_lines=warn_bad_lines, + error_bad_lines=error_bad_lines, + low_memory=low_memory, + mangle_dupe_cols=mangle_dupe_cols, + infer_datetime_format=infer_datetime_format, + skip_blank_lines=skip_blank_lines, + ) + + if chunksize is None: + kwds.update(chunksize=_default_chunk_size) + + client = Client(es_client) + + # read csv in chunks to pandas DataFrame and dump to eland DataFrame (and Elasticsearch) + reader = pd.read_csv(filepath_or_buffer, **kwds) + + first_write = True + for chunk in reader: + if first_write: + pandas_to_eland(chunk, client, es_dest_index, if_exists=es_if_exists, chunksize=chunksize, + refresh=es_refresh, dropna=es_dropna, geo_points=es_geo_points) + first_write = False + else: + pandas_to_eland(chunk, client, es_dest_index, if_exists='append', chunksize=chunksize, + refresh=es_refresh, dropna=es_dropna, geo_points=es_geo_points) + + # Now create an eland.DataFrame that references the new index + ed_df = DataFrame(client, es_dest_index) + + return ed_df - return decorator