Improved read_csv docs + made 'to_eland' params consistent (#114)

* Improved read_csv docs + made 'to_eland' params consistent

Note, will change API.

* Removing additional args from pytest.

doctests + nbval tests in the CI are not addressed by
this PR.
This commit is contained in:
stevedodson 2020-01-16 10:17:49 +00:00 committed by GitHub
parent 1914644f93
commit 46b428d59b
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 196 additions and 57 deletions

View File

@ -753,7 +753,7 @@
{ {
"data": { "data": {
"text/plain": [ "text/plain": [
"<eland.index.Index at 0x11a604f50>" "<eland.index.Index at 0x11ffd7f90>"
] ]
}, },
"execution_count": 17, "execution_count": 17,
@ -2707,7 +2707,7 @@
" <td>410.008918</td>\n", " <td>410.008918</td>\n",
" <td>2470.545974</td>\n", " <td>2470.545974</td>\n",
" <td>...</td>\n", " <td>...</td>\n",
" <td>251.944994</td>\n", " <td>251.698552</td>\n",
" <td>1.000000</td>\n", " <td>1.000000</td>\n",
" </tr>\n", " </tr>\n",
" <tr>\n", " <tr>\n",
@ -2715,16 +2715,16 @@
" <td>640.387285</td>\n", " <td>640.387285</td>\n",
" <td>7612.072403</td>\n", " <td>7612.072403</td>\n",
" <td>...</td>\n", " <td>...</td>\n",
" <td>502.986750</td>\n", " <td>503.148975</td>\n",
" <td>3.000000</td>\n", " <td>3.000000</td>\n",
" </tr>\n", " </tr>\n",
" <tr>\n", " <tr>\n",
" <th>75%</th>\n", " <th>75%</th>\n",
" <td>842.272763</td>\n", " <td>842.233478</td>\n",
" <td>9735.860651</td>\n", " <td>9735.660463</td>\n",
" <td>...</td>\n", " <td>...</td>\n",
" <td>720.505705</td>\n", " <td>720.534532</td>\n",
" <td>4.246711</td>\n", " <td>4.254967</td>\n",
" </tr>\n", " </tr>\n",
" <tr>\n", " <tr>\n",
" <th>max</th>\n", " <th>max</th>\n",
@ -2745,9 +2745,9 @@
"mean 628.253689 7092.142457 ... 511.127842 2.835975\n", "mean 628.253689 7092.142457 ... 511.127842 2.835975\n",
"std 266.386661 4578.263193 ... 334.741135 1.939365\n", "std 266.386661 4578.263193 ... 334.741135 1.939365\n",
"min 100.020531 0.000000 ... 0.000000 0.000000\n", "min 100.020531 0.000000 ... 0.000000 0.000000\n",
"25% 410.008918 2470.545974 ... 251.944994 1.000000\n", "25% 410.008918 2470.545974 ... 251.698552 1.000000\n",
"50% 640.387285 7612.072403 ... 502.986750 3.000000\n", "50% 640.387285 7612.072403 ... 503.148975 3.000000\n",
"75% 842.272763 9735.860651 ... 720.505705 4.246711\n", "75% 842.233478 9735.660463 ... 720.534532 4.254967\n",
"max 1199.729004 19881.482422 ... 1902.901978 6.000000\n", "max 1199.729004 19881.482422 ... 1902.901978 6.000000\n",
"\n", "\n",
"[8 rows x 7 columns]" "[8 rows x 7 columns]"

View File

@ -1023,20 +1023,20 @@
" </tr>\n", " </tr>\n",
" <tr>\n", " <tr>\n",
" <th>25%</th>\n", " <th>25%</th>\n",
" <td>14225.075800</td>\n", " <td>14215.123301</td>\n",
" <td>1.000000</td>\n", " <td>1.000000</td>\n",
" <td>1.250000</td>\n", " <td>1.250100</td>\n",
" </tr>\n", " </tr>\n",
" <tr>\n", " <tr>\n",
" <th>50%</th>\n", " <th>50%</th>\n",
" <td>15667.359184</td>\n", " <td>15654.828552</td>\n",
" <td>2.000000</td>\n", " <td>2.000000</td>\n",
" <td>2.510000</td>\n", " <td>2.510000</td>\n",
" </tr>\n", " </tr>\n",
" <tr>\n", " <tr>\n",
" <th>75%</th>\n", " <th>75%</th>\n",
" <td>17212.690092</td>\n", " <td>17218.003301</td>\n",
" <td>6.552523</td>\n", " <td>6.570576</td>\n",
" <td>4.210000</td>\n", " <td>4.210000</td>\n",
" </tr>\n", " </tr>\n",
" <tr>\n", " <tr>\n",
@ -1055,9 +1055,9 @@
"mean 15590.776680 7.464000 4.103233\n", "mean 15590.776680 7.464000 4.103233\n",
"std 1764.025160 85.924387 20.104873\n", "std 1764.025160 85.924387 20.104873\n",
"min 12347.000000 -9360.000000 0.000000\n", "min 12347.000000 -9360.000000 0.000000\n",
"25% 14225.075800 1.000000 1.250000\n", "25% 14215.123301 1.000000 1.250100\n",
"50% 15667.359184 2.000000 2.510000\n", "50% 15654.828552 2.000000 2.510000\n",
"75% 17212.690092 6.552523 4.210000\n", "75% 17218.003301 6.570576 4.210000\n",
"max 18239.000000 2880.000000 950.990000" "max 18239.000000 2880.000000 950.990000"
] ]
}, },

View File

@ -6,7 +6,7 @@ pandas.DataFrame supported APIs
The following table lists both implemented and not implemented methods. If you have need The following table lists both implemented and not implemented methods. If you have need
of an operation that is listed as not implemented, feel free to open an issue on the of an operation that is listed as not implemented, feel free to open an issue on the
http://github/elastic/eland, or give a thumbs up to already created issues. Contributions are http://github.com/elastic/eland, or give a thumbs up to already created issues. Contributions are
also welcome! also welcome!
The following table is structured as follows: The first column contains the method name. The following table is structured as follows: The first column contains the method name.

View File

@ -95,7 +95,7 @@ class TestDataFrameDateTime(TestData):
# Now create index # Now create index
index_name = 'eland_test_generate_es_mappings' index_name = 'eland_test_generate_es_mappings'
ed_df = ed.pandas_to_eland(df, ES_TEST_CLIENT, index_name, if_exists="replace", refresh=True) ed_df = ed.pandas_to_eland(df, ES_TEST_CLIENT, index_name, es_if_exists="replace", es_refresh=True)
ed_df_head = ed_df.head() ed_df_head = ed_df.head()
print(df.to_string()) print(df.to_string())

View File

@ -41,7 +41,7 @@ class TestDataFrameQuery(TestData):
# Now create index # Now create index
index_name = 'eland_test_query' index_name = 'eland_test_query'
ed_df = ed.pandas_to_eland(pd_df, ES_TEST_CLIENT, index_name, if_exists="replace", refresh=True) ed_df = ed.pandas_to_eland(pd_df, ES_TEST_CLIENT, index_name, es_if_exists="replace", es_refresh=True)
assert_pandas_eland_frame_equal(pd_df, ed_df) assert_pandas_eland_frame_equal(pd_df, ed_df)
@ -97,7 +97,7 @@ class TestDataFrameQuery(TestData):
# Now create index # Now create index
index_name = 'eland_test_query' index_name = 'eland_test_query'
ed_df = ed.pandas_to_eland(pd_df, ES_TEST_CLIENT, index_name, if_exists="replace", refresh=True) ed_df = ed.pandas_to_eland(pd_df, ES_TEST_CLIENT, index_name, es_if_exists="replace", es_refresh=True)
assert_pandas_eland_frame_equal(pd_df, ed_df) assert_pandas_eland_frame_equal(pd_df, ed_df)

View File

@ -50,7 +50,7 @@ class TestDataFrameUtils(TestData):
# Now create index # Now create index
index_name = 'eland_test_generate_es_mappings' index_name = 'eland_test_generate_es_mappings'
ed_df = ed.pandas_to_eland(df, ES_TEST_CLIENT, index_name, if_exists="replace", refresh=True) ed_df = ed.pandas_to_eland(df, ES_TEST_CLIENT, index_name, es_if_exists="replace", es_refresh=True)
ed_df_head = ed_df.head() ed_df_head = ed_df.head()
assert_pandas_eland_frame_equal(df, ed_df_head) assert_pandas_eland_frame_equal(df, ed_df_head)

View File

@ -24,7 +24,7 @@ from eland import FieldMappings
DEFAULT_CHUNK_SIZE = 10000 DEFAULT_CHUNK_SIZE = 10000
def read_es(es_params, index_pattern): def read_es(es_client, es_index_pattern):
""" """
Utility method to create an eland.Dataframe from an Elasticsearch index_pattern. Utility method to create an eland.Dataframe from an Elasticsearch index_pattern.
(Similar to pandas.read_csv, but source data is an Elasticsearch index rather than (Similar to pandas.read_csv, but source data is an Elasticsearch index rather than
@ -32,11 +32,11 @@ def read_es(es_params, index_pattern):
Parameters Parameters
---------- ----------
es_params: Elasticsearch client argument(s) es_client: Elasticsearch client argument(s)
- elasticsearch-py parameters or - elasticsearch-py parameters or
- elasticsearch-py instance or - elasticsearch-py instance or
- eland.Client instance - eland.Client instance
index_pattern: str es_index_pattern: str
Elasticsearch index pattern Elasticsearch index pattern
Returns Returns
@ -48,13 +48,17 @@ def read_es(es_params, index_pattern):
eland.pandas_to_eland: Create an eland.Dataframe from pandas.DataFrame eland.pandas_to_eland: Create an eland.Dataframe from pandas.DataFrame
eland.eland_to_pandas: Create a pandas.Dataframe from eland.DataFrame eland.eland_to_pandas: Create a pandas.Dataframe from eland.DataFrame
""" """
return DataFrame(client=es_params, index_pattern=index_pattern) return DataFrame(client=es_client, index_pattern=es_index_pattern)
def pandas_to_eland(pd_df, es_params, destination_index, if_exists='fail', chunksize=None, def pandas_to_eland(pd_df,
refresh=False, es_client,
dropna=False, es_dest_index,
geo_points=None): es_if_exists='fail',
es_refresh=False,
es_dropna=False,
es_geo_points=None,
chunksize = None):
""" """
Append a pandas DataFrame to an Elasticsearch index. Append a pandas DataFrame to an Elasticsearch index.
Mainly used in testing. Mainly used in testing.
@ -62,31 +66,92 @@ def pandas_to_eland(pd_df, es_params, destination_index, if_exists='fail', chunk
Parameters Parameters
---------- ----------
es_params: Elasticsearch client argument(s) es_client: Elasticsearch client argument(s)
- elasticsearch-py parameters or - elasticsearch-py parameters or
- elasticsearch-py instance or - elasticsearch-py instance or
- eland.Client instance - eland.Client instance
destination_index: str es_dest_index: str
Name of Elasticsearch index to be appended to Name of Elasticsearch index to be appended to
if_exists : {'fail', 'replace', 'append'}, default 'fail' es_if_exists : {'fail', 'replace', 'append'}, default 'fail'
How to behave if the index already exists. How to behave if the index already exists.
- fail: Raise a ValueError. - fail: Raise a ValueError.
- replace: Delete the index before inserting new values. - replace: Delete the index before inserting new values.
- append: Insert new values to the existing index. Create if does not exist. - append: Insert new values to the existing index. Create if does not exist.
refresh: bool, default 'False' es_refresh: bool, default 'False'
Refresh destination_index after bulk index Refresh es_dest_index after bulk index
dropna: bool, default 'False' es_dropna: bool, default 'False'
* True: Remove missing values (see pandas.Series.dropna) * True: Remove missing values (see pandas.Series.dropna)
* False: Include missing values - may cause bulk to fail * False: Include missing values - may cause bulk to fail
geo_points: list, default None es_geo_points: list, default None
List of columns to map to geo_point data type List of columns to map to geo_point data type
chunksize: int, default None
number of pandas.DataFrame rows to read before bulk index into Elasticsearch
Returns Returns
------- -------
eland.Dataframe eland.Dataframe
eland.DataFrame referencing data in destination_index eland.DataFrame referencing data in destination_index
Examples
--------
>>> pd_df = pd.DataFrame(data={'A': 3.141,
... 'B': 1,
... 'C': 'foo',
... 'D': pd.Timestamp('20190102'),
... 'E': [1.0, 2.0, 3.0],
... 'F': False,
... 'G': [1, 2, 3]},
... index=['0', '1', '2'])
>>> type(pd_df)
<class 'pandas.core.frame.DataFrame'>
>>> pd_df
A B ... F G
0 3.141 1 ... False 1
1 3.141 1 ... False 2
2 3.141 1 ... False 3
<BLANKLINE>
[3 rows x 7 columns]
>>> pd_df.dtypes
A float64
B int64
C object
D datetime64[ns]
E float64
F bool
G int64
dtype: object
Convert `pandas.DataFrame` to `eland.DataFrame` - this creates an Elasticsearch index called `pandas_to_eland`.
Overwrite existing Elasticsearch index if it exists `if_exists="replace"`, and sync index so it is
readable on return `refresh=True`
>>> ed_df = ed.pandas_to_eland(pd_df,
... 'localhost',
... 'pandas_to_eland',
... es_if_exists="replace",
... es_refresh=True)
>>> type(ed_df)
<class 'eland.dataframe.DataFrame'>
>>> ed_df
A B ... F G
0 3.141 1 ... False 1
1 3.141 1 ... False 2
2 3.141 1 ... False 3
<BLANKLINE>
[3 rows x 7 columns]
>>> ed_df.dtypes
A float64
B int64
C object
D datetime64[ns]
E float64
F bool
G int64
dtype: object
See Also See Also
-------- --------
eland.read_es: Create an eland.Dataframe from an Elasticsearch index eland.read_es: Create an eland.Dataframe from an Elasticsearch index
@ -95,26 +160,26 @@ def pandas_to_eland(pd_df, es_params, destination_index, if_exists='fail', chunk
if chunksize is None: if chunksize is None:
chunksize = DEFAULT_CHUNK_SIZE chunksize = DEFAULT_CHUNK_SIZE
client = Client(es_params) client = Client(es_client)
mapping = FieldMappings._generate_es_mappings(pd_df, geo_points) mapping = FieldMappings._generate_es_mappings(pd_df, es_geo_points)
# If table exists, check if_exists parameter # If table exists, check if_exists parameter
if client.index_exists(index=destination_index): if client.index_exists(index=es_dest_index):
if if_exists == "fail": if es_if_exists == "fail":
raise ValueError( raise ValueError(
"Could not create the index [{0}] because it " "Could not create the index [{0}] because it "
"already exists. " "already exists. "
"Change the if_exists parameter to " "Change the if_exists parameter to "
"'append' or 'replace' data.".format(destination_index) "'append' or 'replace' data.".format(es_dest_index)
) )
elif if_exists == "replace": elif es_if_exists == "replace":
client.index_delete(index=destination_index) client.index_delete(index=es_dest_index)
client.index_create(index=destination_index, body=mapping) client.index_create(index=es_dest_index, body=mapping)
# elif if_exists == "append": # elif if_exists == "append":
# TODO validate mapping are compatible # TODO validate mapping are compatible
else: else:
client.index_create(index=destination_index, body=mapping) client.index_create(index=es_dest_index, body=mapping)
# Now add data # Now add data
actions = [] actions = []
@ -123,25 +188,25 @@ def pandas_to_eland(pd_df, es_params, destination_index, if_exists='fail', chunk
# Use index as _id # Use index as _id
id = row[0] id = row[0]
if dropna: if es_dropna:
values = row[1].dropna().to_dict() values = row[1].dropna().to_dict()
else: else:
values = row[1].to_dict() values = row[1].to_dict()
# Use integer as id field for repeatable results # Use integer as id field for repeatable results
action = {'_index': destination_index, '_source': values, '_id': str(id)} action = {'_index': es_dest_index, '_source': values, '_id': str(id)}
actions.append(action) actions.append(action)
n = n + 1 n = n + 1
if n % chunksize == 0: if n % chunksize == 0:
client.bulk(actions, refresh=refresh) client.bulk(actions, refresh=es_refresh)
actions = [] actions = []
client.bulk(actions, refresh=refresh) client.bulk(actions, refresh=es_refresh)
ed_df = DataFrame(client, destination_index) ed_df = DataFrame(client, es_dest_index)
return ed_df return ed_df
@ -163,6 +228,36 @@ def eland_to_pandas(ed_df):
pandas.Dataframe pandas.Dataframe
pandas.DataFrame contains all rows and columns in eland.DataFrame pandas.DataFrame contains all rows and columns in eland.DataFrame
Examples
--------
>>> ed_df = ed.DataFrame('localhost', 'flights').head()
>>> type(ed_df)
<class 'eland.dataframe.DataFrame'>
>>> ed_df
AvgTicketPrice Cancelled ... dayOfWeek timestamp
0 841.265642 False ... 0 2018-01-01 00:00:00
1 882.982662 False ... 0 2018-01-01 18:27:00
2 190.636904 False ... 0 2018-01-01 17:11:14
3 181.694216 True ... 0 2018-01-01 10:33:28
4 730.041778 False ... 0 2018-01-01 05:13:00
<BLANKLINE>
[5 rows x 27 columns]
Convert `eland.DataFrame` to `pandas.DataFrame` (Note: this loads entire Elasticsearch index into core memory)
>>> pd_df = ed.eland_to_pandas(ed_df)
>>> type(pd_df)
<class 'pandas.core.frame.DataFrame'>
>>> pd_df
AvgTicketPrice Cancelled ... dayOfWeek timestamp
0 841.265642 False ... 0 2018-01-01 00:00:00
1 882.982662 False ... 0 2018-01-01 18:27:00
2 190.636904 False ... 0 2018-01-01 17:11:14
3 181.694216 True ... 0 2018-01-01 10:33:28
4 730.041778 False ... 0 2018-01-01 05:13:00
<BLANKLINE>
[5 rows x 27 columns]
See Also See Also
-------- --------
eland.read_es: Create an eland.Dataframe from an Elasticsearch index eland.read_es: Create an eland.Dataframe from an Elasticsearch index
@ -275,6 +370,50 @@ def read_csv(filepath_or_buffer,
----- -----
iterator not supported iterator not supported
Examples
--------
See if 'churn' index exists in Elasticsearch
>>> from elasticsearch import Elasticsearch # doctest: +SKIP
>>> es = Elasticsearch() # doctest: +SKIP
>>> es.indices.exists(index="churn") # doctest: +SKIP
False
Read 'churn.csv' and use first column as _id (and eland.DataFrame index)
::
# churn.csv
,state,account length,area code,phone number,international plan,voice mail plan,number vmail messages,total day minutes,total day calls,total day charge,total eve minutes,total eve calls,total eve charge,total night minutes,total night calls,total night charge,total intl minutes,total intl calls,total intl charge,customer service calls,churn
0,KS,128,415,382-4657,no,yes,25,265.1,110,45.07,197.4,99,16.78,244.7,91,11.01,10.0,3,2.7,1,0
1,OH,107,415,371-7191,no,yes,26,161.6,123,27.47,195.5,103,16.62,254.4,103,11.45,13.7,3,3.7,1,0
...
>>> ed.read_csv("churn.csv",
... es_client='localhost',
... es_dest_index='churn',
... es_refresh=True,
... index_col=0) # doctest: +SKIP
account length area code churn customer service calls ... total night calls total night charge total night minutes voice mail plan
0 128 415 0 1 ... 91 11.01 244.7 yes
1 107 415 0 1 ... 103 11.45 254.4 yes
2 137 415 0 0 ... 104 7.32 162.6 no
3 84 408 0 2 ... 89 8.86 196.9 no
4 75 415 0 3 ... 121 8.41 186.9 no
... ... ... ... ... ... ... ... ... ...
3328 192 415 0 2 ... 83 12.56 279.1 yes
3329 68 415 0 3 ... 123 8.61 191.3 no
3330 28 510 0 2 ... 91 8.64 191.9 no
3331 184 510 0 2 ... 137 6.26 139.2 no
3332 74 415 0 0 ... 77 10.86 241.4 yes
<BLANKLINE>
[3333 rows x 21 columns]
Validate data now exists in 'churn' index:
>>> es.search(index="churn", size=1) # doctest: +SKIP
{'took': 1, 'timed_out': False, '_shards': {'total': 1, 'successful': 1, 'skipped': 0, 'failed': 0}, 'hits': {'total': {'value': 3333, 'relation': 'eq'}, 'max_score': 1.0, 'hits': [{'_index': 'churn', '_id': '0', '_score': 1.0, '_source': {'state': 'KS', 'account length': 128, 'area code': 415, 'phone number': '382-4657', 'international plan': 'no', 'voice mail plan': 'yes', 'number vmail messages': 25, 'total day minutes': 265.1, 'total day calls': 110, 'total day charge': 45.07, 'total eve minutes': 197.4, 'total eve calls': 99, 'total eve charge': 16.78, 'total night minutes': 244.7, 'total night calls': 91, 'total night charge': 11.01, 'total intl minutes': 10.0, 'total intl calls': 3, 'total intl charge': 2.7, 'customer service calls': 1, 'churn': 0}}]}}
TODO - currently the eland.DataFrame may not retain the order of the data in the csv. TODO - currently the eland.DataFrame may not retain the order of the data in the csv.
""" """
kwds = dict() kwds = dict()
@ -342,12 +481,12 @@ def read_csv(filepath_or_buffer,
first_write = True first_write = True
for chunk in reader: for chunk in reader:
if first_write: if first_write:
pandas_to_eland(chunk, client, es_dest_index, if_exists=es_if_exists, chunksize=chunksize, pandas_to_eland(chunk, client, es_dest_index, es_if_exists=es_if_exists, chunksize=chunksize,
refresh=es_refresh, dropna=es_dropna, geo_points=es_geo_points) es_refresh=es_refresh, es_dropna=es_dropna, es_geo_points=es_geo_points)
first_write = False first_write = False
else: else:
pandas_to_eland(chunk, client, es_dest_index, if_exists='append', chunksize=chunksize, pandas_to_eland(chunk, client, es_dest_index, es_if_exists='append', chunksize=chunksize,
refresh=es_refresh, dropna=es_dropna, geo_points=es_geo_points) es_refresh=es_refresh, es_dropna=es_dropna, es_geo_points=es_geo_points)
# Now create an eland.DataFrame that references the new index # Now create an eland.DataFrame that references the new index
ed_df = DataFrame(client, es_dest_index) ed_df = DataFrame(client, es_dest_index)

View File

@ -1,4 +1,4 @@
#!/usr/bin/env bash #!/usr/bin/env bash
python -m eland.tests.setup_tests python -m eland.tests.setup_tests
pytest pytest