diff --git a/docs/source/examples/demo_notebook.ipynb b/docs/source/examples/demo_notebook.ipynb index 2209460..bc9a13c 100644 --- a/docs/source/examples/demo_notebook.ipynb +++ b/docs/source/examples/demo_notebook.ipynb @@ -753,7 +753,7 @@ { "data": { "text/plain": [ - "" + "" ] }, "execution_count": 17, @@ -2707,7 +2707,7 @@ " 410.008918\n", " 2470.545974\n", " ...\n", - " 251.944994\n", + " 251.698552\n", " 1.000000\n", " \n", " \n", @@ -2715,16 +2715,16 @@ " 640.387285\n", " 7612.072403\n", " ...\n", - " 502.986750\n", + " 503.148975\n", " 3.000000\n", " \n", " \n", " 75%\n", - " 842.272763\n", - " 9735.860651\n", + " 842.233478\n", + " 9735.660463\n", " ...\n", - " 720.505705\n", - " 4.246711\n", + " 720.534532\n", + " 4.254967\n", " \n", " \n", " max\n", @@ -2745,9 +2745,9 @@ "mean 628.253689 7092.142457 ... 511.127842 2.835975\n", "std 266.386661 4578.263193 ... 334.741135 1.939365\n", "min 100.020531 0.000000 ... 0.000000 0.000000\n", - "25% 410.008918 2470.545974 ... 251.944994 1.000000\n", - "50% 640.387285 7612.072403 ... 502.986750 3.000000\n", - "75% 842.272763 9735.860651 ... 720.505705 4.246711\n", + "25% 410.008918 2470.545974 ... 251.698552 1.000000\n", + "50% 640.387285 7612.072403 ... 503.148975 3.000000\n", + "75% 842.233478 9735.660463 ... 720.534532 4.254967\n", "max 1199.729004 19881.482422 ... 1902.901978 6.000000\n", "\n", "[8 rows x 7 columns]" diff --git a/docs/source/examples/online_retail_analysis.ipynb b/docs/source/examples/online_retail_analysis.ipynb index f2e8e26..a8ed6c5 100644 --- a/docs/source/examples/online_retail_analysis.ipynb +++ b/docs/source/examples/online_retail_analysis.ipynb @@ -1023,20 +1023,20 @@ " \n", " \n", " 25%\n", - " 14225.075800\n", + " 14215.123301\n", " 1.000000\n", - " 1.250000\n", + " 1.250100\n", " \n", " \n", " 50%\n", - " 15667.359184\n", + " 15654.828552\n", " 2.000000\n", " 2.510000\n", " \n", " \n", " 75%\n", - " 17212.690092\n", - " 6.552523\n", + " 17218.003301\n", + " 6.570576\n", " 4.210000\n", " \n", " \n", @@ -1055,9 +1055,9 @@ "mean 15590.776680 7.464000 4.103233\n", "std 1764.025160 85.924387 20.104873\n", "min 12347.000000 -9360.000000 0.000000\n", - "25% 14225.075800 1.000000 1.250000\n", - "50% 15667.359184 2.000000 2.510000\n", - "75% 17212.690092 6.552523 4.210000\n", + "25% 14215.123301 1.000000 1.250100\n", + "50% 15654.828552 2.000000 2.510000\n", + "75% 17218.003301 6.570576 4.210000\n", "max 18239.000000 2880.000000 950.990000" ] }, diff --git a/docs/source/implementation/dataframe_supported.rst b/docs/source/implementation/dataframe_supported.rst index 8ea727e..172cd2c 100644 --- a/docs/source/implementation/dataframe_supported.rst +++ b/docs/source/implementation/dataframe_supported.rst @@ -6,7 +6,7 @@ pandas.DataFrame supported APIs The following table lists both implemented and not implemented methods. If you have need of an operation that is listed as not implemented, feel free to open an issue on the -http://github/elastic/eland, or give a thumbs up to already created issues. Contributions are +http://github.com/elastic/eland, or give a thumbs up to already created issues. Contributions are also welcome! The following table is structured as follows: The first column contains the method name. diff --git a/eland/tests/dataframe/test_datetime_pytest.py b/eland/tests/dataframe/test_datetime_pytest.py index 0c72495..2d45f35 100644 --- a/eland/tests/dataframe/test_datetime_pytest.py +++ b/eland/tests/dataframe/test_datetime_pytest.py @@ -95,7 +95,7 @@ class TestDataFrameDateTime(TestData): # Now create index index_name = 'eland_test_generate_es_mappings' - ed_df = ed.pandas_to_eland(df, ES_TEST_CLIENT, index_name, if_exists="replace", refresh=True) + ed_df = ed.pandas_to_eland(df, ES_TEST_CLIENT, index_name, es_if_exists="replace", es_refresh=True) ed_df_head = ed_df.head() print(df.to_string()) diff --git a/eland/tests/dataframe/test_query_pytest.py b/eland/tests/dataframe/test_query_pytest.py index 309c30e..d588f4e 100644 --- a/eland/tests/dataframe/test_query_pytest.py +++ b/eland/tests/dataframe/test_query_pytest.py @@ -41,7 +41,7 @@ class TestDataFrameQuery(TestData): # Now create index index_name = 'eland_test_query' - ed_df = ed.pandas_to_eland(pd_df, ES_TEST_CLIENT, index_name, if_exists="replace", refresh=True) + ed_df = ed.pandas_to_eland(pd_df, ES_TEST_CLIENT, index_name, es_if_exists="replace", es_refresh=True) assert_pandas_eland_frame_equal(pd_df, ed_df) @@ -97,7 +97,7 @@ class TestDataFrameQuery(TestData): # Now create index index_name = 'eland_test_query' - ed_df = ed.pandas_to_eland(pd_df, ES_TEST_CLIENT, index_name, if_exists="replace", refresh=True) + ed_df = ed.pandas_to_eland(pd_df, ES_TEST_CLIENT, index_name, es_if_exists="replace", es_refresh=True) assert_pandas_eland_frame_equal(pd_df, ed_df) diff --git a/eland/tests/dataframe/test_utils_pytest.py b/eland/tests/dataframe/test_utils_pytest.py index 5c0b86c..8edb1e5 100644 --- a/eland/tests/dataframe/test_utils_pytest.py +++ b/eland/tests/dataframe/test_utils_pytest.py @@ -50,7 +50,7 @@ class TestDataFrameUtils(TestData): # Now create index index_name = 'eland_test_generate_es_mappings' - ed_df = ed.pandas_to_eland(df, ES_TEST_CLIENT, index_name, if_exists="replace", refresh=True) + ed_df = ed.pandas_to_eland(df, ES_TEST_CLIENT, index_name, es_if_exists="replace", es_refresh=True) ed_df_head = ed_df.head() assert_pandas_eland_frame_equal(df, ed_df_head) diff --git a/eland/utils.py b/eland/utils.py index b826886..721b994 100644 --- a/eland/utils.py +++ b/eland/utils.py @@ -24,7 +24,7 @@ from eland import FieldMappings DEFAULT_CHUNK_SIZE = 10000 -def read_es(es_params, index_pattern): +def read_es(es_client, es_index_pattern): """ Utility method to create an eland.Dataframe from an Elasticsearch index_pattern. (Similar to pandas.read_csv, but source data is an Elasticsearch index rather than @@ -32,11 +32,11 @@ def read_es(es_params, index_pattern): Parameters ---------- - es_params: Elasticsearch client argument(s) + es_client: Elasticsearch client argument(s) - elasticsearch-py parameters or - elasticsearch-py instance or - eland.Client instance - index_pattern: str + es_index_pattern: str Elasticsearch index pattern Returns @@ -48,13 +48,17 @@ def read_es(es_params, index_pattern): eland.pandas_to_eland: Create an eland.Dataframe from pandas.DataFrame eland.eland_to_pandas: Create a pandas.Dataframe from eland.DataFrame """ - return DataFrame(client=es_params, index_pattern=index_pattern) + return DataFrame(client=es_client, index_pattern=es_index_pattern) -def pandas_to_eland(pd_df, es_params, destination_index, if_exists='fail', chunksize=None, - refresh=False, - dropna=False, - geo_points=None): +def pandas_to_eland(pd_df, + es_client, + es_dest_index, + es_if_exists='fail', + es_refresh=False, + es_dropna=False, + es_geo_points=None, + chunksize = None): """ Append a pandas DataFrame to an Elasticsearch index. Mainly used in testing. @@ -62,31 +66,92 @@ def pandas_to_eland(pd_df, es_params, destination_index, if_exists='fail', chunk Parameters ---------- - es_params: Elasticsearch client argument(s) + es_client: Elasticsearch client argument(s) - elasticsearch-py parameters or - elasticsearch-py instance or - eland.Client instance - destination_index: str + es_dest_index: str Name of Elasticsearch index to be appended to - if_exists : {'fail', 'replace', 'append'}, default 'fail' + es_if_exists : {'fail', 'replace', 'append'}, default 'fail' How to behave if the index already exists. - fail: Raise a ValueError. - replace: Delete the index before inserting new values. - append: Insert new values to the existing index. Create if does not exist. - refresh: bool, default 'False' - Refresh destination_index after bulk index - dropna: bool, default 'False' + es_refresh: bool, default 'False' + Refresh es_dest_index after bulk index + es_dropna: bool, default 'False' * True: Remove missing values (see pandas.Series.dropna) * False: Include missing values - may cause bulk to fail - geo_points: list, default None + es_geo_points: list, default None List of columns to map to geo_point data type + chunksize: int, default None + number of pandas.DataFrame rows to read before bulk index into Elasticsearch Returns ------- eland.Dataframe eland.DataFrame referencing data in destination_index + Examples + -------- + + >>> pd_df = pd.DataFrame(data={'A': 3.141, + ... 'B': 1, + ... 'C': 'foo', + ... 'D': pd.Timestamp('20190102'), + ... 'E': [1.0, 2.0, 3.0], + ... 'F': False, + ... 'G': [1, 2, 3]}, + ... index=['0', '1', '2']) + >>> type(pd_df) + + >>> pd_df + A B ... F G + 0 3.141 1 ... False 1 + 1 3.141 1 ... False 2 + 2 3.141 1 ... False 3 + + [3 rows x 7 columns] + >>> pd_df.dtypes + A float64 + B int64 + C object + D datetime64[ns] + E float64 + F bool + G int64 + dtype: object + + Convert `pandas.DataFrame` to `eland.DataFrame` - this creates an Elasticsearch index called `pandas_to_eland`. + Overwrite existing Elasticsearch index if it exists `if_exists="replace"`, and sync index so it is + readable on return `refresh=True` + + + >>> ed_df = ed.pandas_to_eland(pd_df, + ... 'localhost', + ... 'pandas_to_eland', + ... es_if_exists="replace", + ... es_refresh=True) + >>> type(ed_df) + + >>> ed_df + A B ... F G + 0 3.141 1 ... False 1 + 1 3.141 1 ... False 2 + 2 3.141 1 ... False 3 + + [3 rows x 7 columns] + >>> ed_df.dtypes + A float64 + B int64 + C object + D datetime64[ns] + E float64 + F bool + G int64 + dtype: object + See Also -------- eland.read_es: Create an eland.Dataframe from an Elasticsearch index @@ -95,26 +160,26 @@ def pandas_to_eland(pd_df, es_params, destination_index, if_exists='fail', chunk if chunksize is None: chunksize = DEFAULT_CHUNK_SIZE - client = Client(es_params) + client = Client(es_client) - mapping = FieldMappings._generate_es_mappings(pd_df, geo_points) + mapping = FieldMappings._generate_es_mappings(pd_df, es_geo_points) # If table exists, check if_exists parameter - if client.index_exists(index=destination_index): - if if_exists == "fail": + if client.index_exists(index=es_dest_index): + if es_if_exists == "fail": raise ValueError( "Could not create the index [{0}] because it " "already exists. " "Change the if_exists parameter to " - "'append' or 'replace' data.".format(destination_index) + "'append' or 'replace' data.".format(es_dest_index) ) - elif if_exists == "replace": - client.index_delete(index=destination_index) - client.index_create(index=destination_index, body=mapping) + elif es_if_exists == "replace": + client.index_delete(index=es_dest_index) + client.index_create(index=es_dest_index, body=mapping) # elif if_exists == "append": # TODO validate mapping are compatible else: - client.index_create(index=destination_index, body=mapping) + client.index_create(index=es_dest_index, body=mapping) # Now add data actions = [] @@ -123,25 +188,25 @@ def pandas_to_eland(pd_df, es_params, destination_index, if_exists='fail', chunk # Use index as _id id = row[0] - if dropna: + if es_dropna: values = row[1].dropna().to_dict() else: values = row[1].to_dict() # Use integer as id field for repeatable results - action = {'_index': destination_index, '_source': values, '_id': str(id)} + action = {'_index': es_dest_index, '_source': values, '_id': str(id)} actions.append(action) n = n + 1 if n % chunksize == 0: - client.bulk(actions, refresh=refresh) + client.bulk(actions, refresh=es_refresh) actions = [] - client.bulk(actions, refresh=refresh) + client.bulk(actions, refresh=es_refresh) - ed_df = DataFrame(client, destination_index) + ed_df = DataFrame(client, es_dest_index) return ed_df @@ -163,6 +228,36 @@ def eland_to_pandas(ed_df): pandas.Dataframe pandas.DataFrame contains all rows and columns in eland.DataFrame + Examples + -------- + >>> ed_df = ed.DataFrame('localhost', 'flights').head() + >>> type(ed_df) + + >>> ed_df + AvgTicketPrice Cancelled ... dayOfWeek timestamp + 0 841.265642 False ... 0 2018-01-01 00:00:00 + 1 882.982662 False ... 0 2018-01-01 18:27:00 + 2 190.636904 False ... 0 2018-01-01 17:11:14 + 3 181.694216 True ... 0 2018-01-01 10:33:28 + 4 730.041778 False ... 0 2018-01-01 05:13:00 + + [5 rows x 27 columns] + + Convert `eland.DataFrame` to `pandas.DataFrame` (Note: this loads entire Elasticsearch index into core memory) + + >>> pd_df = ed.eland_to_pandas(ed_df) + >>> type(pd_df) + + >>> pd_df + AvgTicketPrice Cancelled ... dayOfWeek timestamp + 0 841.265642 False ... 0 2018-01-01 00:00:00 + 1 882.982662 False ... 0 2018-01-01 18:27:00 + 2 190.636904 False ... 0 2018-01-01 17:11:14 + 3 181.694216 True ... 0 2018-01-01 10:33:28 + 4 730.041778 False ... 0 2018-01-01 05:13:00 + + [5 rows x 27 columns] + See Also -------- eland.read_es: Create an eland.Dataframe from an Elasticsearch index @@ -275,6 +370,50 @@ def read_csv(filepath_or_buffer, ----- iterator not supported + Examples + -------- + + See if 'churn' index exists in Elasticsearch + + >>> from elasticsearch import Elasticsearch # doctest: +SKIP + >>> es = Elasticsearch() # doctest: +SKIP + >>> es.indices.exists(index="churn") # doctest: +SKIP + False + + Read 'churn.csv' and use first column as _id (and eland.DataFrame index) + :: + + # churn.csv + ,state,account length,area code,phone number,international plan,voice mail plan,number vmail messages,total day minutes,total day calls,total day charge,total eve minutes,total eve calls,total eve charge,total night minutes,total night calls,total night charge,total intl minutes,total intl calls,total intl charge,customer service calls,churn + 0,KS,128,415,382-4657,no,yes,25,265.1,110,45.07,197.4,99,16.78,244.7,91,11.01,10.0,3,2.7,1,0 + 1,OH,107,415,371-7191,no,yes,26,161.6,123,27.47,195.5,103,16.62,254.4,103,11.45,13.7,3,3.7,1,0 + ... + + >>> ed.read_csv("churn.csv", + ... es_client='localhost', + ... es_dest_index='churn', + ... es_refresh=True, + ... index_col=0) # doctest: +SKIP + account length area code churn customer service calls ... total night calls total night charge total night minutes voice mail plan + 0 128 415 0 1 ... 91 11.01 244.7 yes + 1 107 415 0 1 ... 103 11.45 254.4 yes + 2 137 415 0 0 ... 104 7.32 162.6 no + 3 84 408 0 2 ... 89 8.86 196.9 no + 4 75 415 0 3 ... 121 8.41 186.9 no + ... ... ... ... ... ... ... ... ... ... + 3328 192 415 0 2 ... 83 12.56 279.1 yes + 3329 68 415 0 3 ... 123 8.61 191.3 no + 3330 28 510 0 2 ... 91 8.64 191.9 no + 3331 184 510 0 2 ... 137 6.26 139.2 no + 3332 74 415 0 0 ... 77 10.86 241.4 yes + + [3333 rows x 21 columns] + + Validate data now exists in 'churn' index: + + >>> es.search(index="churn", size=1) # doctest: +SKIP + {'took': 1, 'timed_out': False, '_shards': {'total': 1, 'successful': 1, 'skipped': 0, 'failed': 0}, 'hits': {'total': {'value': 3333, 'relation': 'eq'}, 'max_score': 1.0, 'hits': [{'_index': 'churn', '_id': '0', '_score': 1.0, '_source': {'state': 'KS', 'account length': 128, 'area code': 415, 'phone number': '382-4657', 'international plan': 'no', 'voice mail plan': 'yes', 'number vmail messages': 25, 'total day minutes': 265.1, 'total day calls': 110, 'total day charge': 45.07, 'total eve minutes': 197.4, 'total eve calls': 99, 'total eve charge': 16.78, 'total night minutes': 244.7, 'total night calls': 91, 'total night charge': 11.01, 'total intl minutes': 10.0, 'total intl calls': 3, 'total intl charge': 2.7, 'customer service calls': 1, 'churn': 0}}]}} + TODO - currently the eland.DataFrame may not retain the order of the data in the csv. """ kwds = dict() @@ -342,12 +481,12 @@ def read_csv(filepath_or_buffer, first_write = True for chunk in reader: if first_write: - pandas_to_eland(chunk, client, es_dest_index, if_exists=es_if_exists, chunksize=chunksize, - refresh=es_refresh, dropna=es_dropna, geo_points=es_geo_points) + pandas_to_eland(chunk, client, es_dest_index, es_if_exists=es_if_exists, chunksize=chunksize, + es_refresh=es_refresh, es_dropna=es_dropna, es_geo_points=es_geo_points) first_write = False else: - pandas_to_eland(chunk, client, es_dest_index, if_exists='append', chunksize=chunksize, - refresh=es_refresh, dropna=es_dropna, geo_points=es_geo_points) + pandas_to_eland(chunk, client, es_dest_index, es_if_exists='append', chunksize=chunksize, + es_refresh=es_refresh, es_dropna=es_dropna, es_geo_points=es_geo_points) # Now create an eland.DataFrame that references the new index ed_df = DataFrame(client, es_dest_index) diff --git a/run_build.sh b/run_build.sh index c63a89b..32e4076 100755 --- a/run_build.sh +++ b/run_build.sh @@ -1,4 +1,4 @@ #!/usr/bin/env bash python -m eland.tests.setup_tests -pytest \ No newline at end of file +pytest