Fix non _source fields missing from the result hits (#693)

This commit is contained in:
Bart Broere 2024-06-10 09:09:52 +02:00 committed by GitHub
parent 632074c0f0
commit 1014ecdb39
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
11 changed files with 240 additions and 221 deletions

View File

@ -83,7 +83,7 @@ class DataFrame(NDFrame):
3 181.694216 True ... 0 2018-01-01 10:33:28 3 181.694216 True ... 0 2018-01-01 10:33:28
4 730.041778 False ... 0 2018-01-01 05:13:00 4 730.041778 False ... 0 2018-01-01 05:13:00
<BLANKLINE> <BLANKLINE>
[5 rows x 27 columns] [5 rows x 28 columns]
Constructing DataFrame from an Elasticsearch client and an Elasticsearch index Constructing DataFrame from an Elasticsearch client and an Elasticsearch index
@ -173,13 +173,13 @@ class DataFrame(NDFrame):
>>> df = ed.DataFrame('http://localhost:9200', 'flights') >>> df = ed.DataFrame('http://localhost:9200', 'flights')
>>> assert isinstance(df.columns, pd.Index) >>> assert isinstance(df.columns, pd.Index)
>>> df.columns >>> df.columns
Index(['AvgTicketPrice', 'Cancelled', 'Carrier', 'Dest', 'DestAirportID', 'DestCityName', Index(['AvgTicketPrice', 'Cancelled', 'Carrier', 'Cities', 'Dest', 'DestAirportID', 'DestCityName',
... 'DestCountry', 'DestLocation', 'DestRegion', 'DestWeather', 'DistanceKilometers', 'DestCountry', 'DestLocation', 'DestRegion', 'DestWeather', 'DistanceKilometers',
... 'DistanceMiles', 'FlightDelay', 'FlightDelayMin', 'FlightDelayType', 'FlightNum', 'DistanceMiles', 'FlightDelay', 'FlightDelayMin', 'FlightDelayType', 'FlightNum',
... 'FlightTimeHour', 'FlightTimeMin', 'Origin', 'OriginAirportID', 'OriginCityName', 'FlightTimeHour', 'FlightTimeMin', 'Origin', 'OriginAirportID', 'OriginCityName',
... 'OriginCountry', 'OriginLocation', 'OriginRegion', 'OriginWeather', 'dayOfWeek', 'OriginCountry', 'OriginLocation', 'OriginRegion', 'OriginWeather', 'dayOfWeek',
... 'timestamp'], 'timestamp'],
... dtype='object') dtype='object')
""" """
return self._query_compiler.columns return self._query_compiler.columns
@ -2014,9 +2014,9 @@ class DataFrame(NDFrame):
-------- --------
>>> df = ed.DataFrame('http://localhost:9200', 'flights') >>> df = ed.DataFrame('http://localhost:9200', 'flights')
>>> df.shape >>> df.shape
(13059, 27) (13059, 28)
>>> df.query('FlightDelayMin > 60').shape >>> df.query('FlightDelayMin > 60').shape
(2730, 27) (2730, 28)
""" """
if isinstance(expr, BooleanFilter): if isinstance(expr, BooleanFilter):
return DataFrame( return DataFrame(

View File

@ -262,7 +262,7 @@ def eland_to_pandas(ed_df: DataFrame, show_progress: bool = False) -> pd.DataFra
3 181.694216 True ... 0 2018-01-01 10:33:28 3 181.694216 True ... 0 2018-01-01 10:33:28
4 730.041778 False ... 0 2018-01-01 05:13:00 4 730.041778 False ... 0 2018-01-01 05:13:00
<BLANKLINE> <BLANKLINE>
[5 rows x 27 columns] [5 rows x 28 columns]
Convert `eland.DataFrame` to `pandas.DataFrame` (Note: this loads entire Elasticsearch index into core memory) Convert `eland.DataFrame` to `pandas.DataFrame` (Note: this loads entire Elasticsearch index into core memory)
@ -277,7 +277,7 @@ def eland_to_pandas(ed_df: DataFrame, show_progress: bool = False) -> pd.DataFra
3 181.694216 True ... 0 2018-01-01 10:33:28 3 181.694216 True ... 0 2018-01-01 10:33:28
4 730.041778 False ... 0 2018-01-01 05:13:00 4 730.041778 False ... 0 2018-01-01 05:13:00
<BLANKLINE> <BLANKLINE>
[5 rows x 27 columns] [5 rows x 28 columns]
Convert `eland.DataFrame` to `pandas.DataFrame` and show progress every 10000 rows Convert `eland.DataFrame` to `pandas.DataFrame` and show progress every 10000 rows

View File

@ -1543,6 +1543,16 @@ def quantile_to_percentile(quantile: Union[int, float]) -> float:
return float(min(100, max(0, quantile * 100))) return float(min(100, max(0, quantile * 100)))
def is_field_already_present(key: str, dictionary: Dict[str, Any]) -> bool:
if "." in key:
splitted = key.split(".")
return is_field_already_present(
".".join(splitted[1:]), dictionary.get(splitted[0], {})
)
else:
return key in dictionary
def _search_yield_hits( def _search_yield_hits(
query_compiler: "QueryCompiler", query_compiler: "QueryCompiler",
body: Dict[str, Any], body: Dict[str, Any],
@ -1600,10 +1610,24 @@ def _search_yield_hits(
# Modify the search with the new point in time ID and keep-alive time. # Modify the search with the new point in time ID and keep-alive time.
body["pit"] = {"id": pit_id, "keep_alive": DEFAULT_PIT_KEEP_ALIVE} body["pit"] = {"id": pit_id, "keep_alive": DEFAULT_PIT_KEEP_ALIVE}
if isinstance(body["_source"], list):
body["fields"] = body["_source"]
while max_number_of_hits is None or hits_yielded < max_number_of_hits: while max_number_of_hits is None or hits_yielded < max_number_of_hits:
resp = client.search(**body) resp = client.search(**body)
hits: List[Dict[str, Any]] = resp["hits"]["hits"] hits: List[Dict[str, Any]] = []
for hit in resp["hits"]["hits"]:
# Copy some of the fields to _source if they are missing there.
if "fields" in hit and "_source" in hit:
fields = hit["fields"]
del hit["fields"]
for k, v in fields.items():
if not is_field_already_present(k, hit["_source"]):
if isinstance(v, list):
hit["_source"][k] = list(sorted(v))
else:
hit["_source"][k] = v
hits.append(hit)
# The point in time ID can change between searches so we # The point in time ID can change between searches so we
# need to keep the next search up-to-date # need to keep the next search up-to-date

View File

@ -43,7 +43,7 @@ FLIGHTS_MAPPING = {
"Carrier": {"type": "keyword"}, "Carrier": {"type": "keyword"},
"Dest": {"type": "keyword"}, "Dest": {"type": "keyword"},
"DestAirportID": {"type": "keyword"}, "DestAirportID": {"type": "keyword"},
"DestCityName": {"type": "keyword"}, "DestCityName": {"type": "keyword", "copy_to": "Cities"},
"DestCountry": {"type": "keyword"}, "DestCountry": {"type": "keyword"},
"DestLocation": {"type": "geo_point"}, "DestLocation": {"type": "geo_point"},
"DestRegion": {"type": "keyword"}, "DestRegion": {"type": "keyword"},
@ -58,11 +58,12 @@ FLIGHTS_MAPPING = {
"FlightTimeMin": {"type": "float"}, "FlightTimeMin": {"type": "float"},
"Origin": {"type": "keyword"}, "Origin": {"type": "keyword"},
"OriginAirportID": {"type": "keyword"}, "OriginAirportID": {"type": "keyword"},
"OriginCityName": {"type": "keyword"}, "OriginCityName": {"type": "keyword", "copy_to": "Cities"},
"OriginCountry": {"type": "keyword"}, "OriginCountry": {"type": "keyword"},
"OriginLocation": {"type": "geo_point"}, "OriginLocation": {"type": "geo_point"},
"OriginRegion": {"type": "keyword"}, "OriginRegion": {"type": "keyword"},
"OriginWeather": {"type": "keyword"}, "OriginWeather": {"type": "keyword"},
"Cities": {"type": "text"},
"dayOfWeek": {"type": "byte"}, "dayOfWeek": {"type": "byte"},
"timestamp": {"type": "date", "format": "strict_date_hour_minute_second"}, "timestamp": {"type": "date", "format": "strict_date_hour_minute_second"},
} }

View File

@ -46,6 +46,10 @@ _pd_flights = pd.DataFrame.from_records(flight_records).reindex(
_ed_flights.columns, axis=1 _ed_flights.columns, axis=1
) )
_pd_flights["timestamp"] = pd.to_datetime(_pd_flights["timestamp"]) _pd_flights["timestamp"] = pd.to_datetime(_pd_flights["timestamp"])
# Mimic what copy_to in an Elasticsearch mapping would do, combining the two fields in a list
_pd_flights["Cities"] = _pd_flights.apply(
lambda x: list(sorted([x["OriginCityName"], x["DestCityName"]])), axis=1
)
_pd_flights.index = _pd_flights.index.map(str) # make index 'object' not int _pd_flights.index = _pd_flights.index.map(str) # make index 'object' not int
_pd_flights_small = _pd_flights.head(48) _pd_flights_small = _pd_flights.head(48)

View File

@ -43,6 +43,7 @@ class TestDataFrameDtypes:
"AvgTicketPrice": "float", "AvgTicketPrice": "float",
"Cancelled": "boolean", "Cancelled": "boolean",
"Carrier": "keyword", "Carrier": "keyword",
"Cities": "text",
"Dest": "keyword", "Dest": "keyword",
"DestAirportID": "keyword", "DestAirportID": "keyword",
"DestCityName": "keyword", "DestCityName": "keyword",

View File

@ -41,8 +41,9 @@ class TestDataFrameToCSV(TestData):
results_file, results_file,
index_col=0, index_col=0,
converters={ converters={
"DestLocation": lambda x: ast.literal_eval(x), "DestLocation": ast.literal_eval,
"OriginLocation": lambda x: ast.literal_eval(x), "OriginLocation": ast.literal_eval,
"Cities": ast.literal_eval,
}, },
) )
pd_from_csv.index = pd_from_csv.index.map(str) pd_from_csv.index = pd_from_csv.index.map(str)
@ -63,8 +64,9 @@ class TestDataFrameToCSV(TestData):
results_file, results_file,
index_col=0, index_col=0,
converters={ converters={
"DestLocation": lambda x: ast.literal_eval(x), "DestLocation": ast.literal_eval,
"OriginLocation": lambda x: ast.literal_eval(x), "OriginLocation": ast.literal_eval,
"Cities": ast.literal_eval,
}, },
) )
pd_from_csv.index = pd_from_csv.index.map(str) pd_from_csv.index = pd_from_csv.index.map(str)
@ -112,8 +114,9 @@ class TestDataFrameToCSV(TestData):
results, results,
index_col=0, index_col=0,
converters={ converters={
"DestLocation": lambda x: ast.literal_eval(x), "DestLocation": ast.literal_eval,
"OriginLocation": lambda x: ast.literal_eval(x), "OriginLocation": ast.literal_eval,
"Cities": ast.literal_eval,
}, },
) )
pd_from_csv.index = pd_from_csv.index.map(str) pd_from_csv.index = pd_from_csv.index.map(str)

File diff suppressed because one or more lines are too long

View File

@ -19,7 +19,7 @@
{ {
"data": { "data": {
"text/plain": [ "text/plain": [
"False" "HeadApiResponse(False)"
] ]
}, },
"execution_count": 2, "execution_count": 2,
@ -43,8 +43,8 @@
"name": "stdout", "name": "stdout",
"output_type": "stream", "output_type": "stream",
"text": [ "text": [
"2021-03-30 11:57:39.116425: read 10000 rows\n", "2024-05-21 09:07:17.882569: read 10000 rows\n",
"2021-03-30 11:57:39.522722: read 13059 rows\n" "2024-05-21 09:07:18.375305: read 13059 rows\n"
] ]
} }
], ],
@ -78,6 +78,18 @@
"execution_count": 5, "execution_count": 5,
"metadata": {}, "metadata": {},
"outputs": [ "outputs": [
{
"name": "stderr",
"output_type": "stream",
"text": [
"/home/codespace/.python/current/lib/python3.10/site-packages/eland/etl.py:529: FutureWarning: the 'mangle_dupe_cols' keyword is deprecated and will be removed in a future version. Please take steps to stop the use of 'mangle_dupe_cols'\n",
" reader = pd.read_csv(filepath_or_buffer, **kwargs)\n",
"/home/codespace/.python/current/lib/python3.10/site-packages/eland/etl.py:529: FutureWarning: The squeeze argument has been deprecated and will be removed in a future version. Append .squeeze(\"columns\") to the call to squeeze.\n",
"\n",
"\n",
" reader = pd.read_csv(filepath_or_buffer, **kwargs)\n"
]
},
{ {
"data": { "data": {
"text/html": [ "text/html": [
@ -218,35 +230,7 @@
{ {
"data": { "data": {
"text/plain": [ "text/plain": [
"{'took': 0,\n", "ObjectApiResponse({'took': 0, 'timed_out': False, '_shards': {'total': 1, 'successful': 1, 'skipped': 0, 'failed': 0}, 'hits': {'total': {'value': 2, 'relation': 'eq'}, 'max_score': 1.0, 'hits': [{'_index': 'churn', '_id': '0', '_score': 1.0, '_source': {'state': 'KS', 'account length': 128, 'area code': 415, 'phone number': '382-4657', 'international plan': 'no', 'voice mail plan': 'yes', 'number vmail messages': 25, 'total day minutes': 265.1, 'total day calls': 110, 'total day charge': 45.07, 'total eve minutes': 197.4, 'total eve calls': 99, 'total eve charge': 16.78, 'total night minutes': 244.7, 'total night calls': 91, 'total night charge': 11.01, 'total intl minutes': 10.0, 'total intl calls': 3, 'total intl charge': 2.7, 'customer service calls': 1, 'churn': 0}}]}})"
" 'timed_out': False,\n",
" '_shards': {'total': 1, 'successful': 1, 'skipped': 0, 'failed': 0},\n",
" 'hits': {'total': {'value': 2, 'relation': 'eq'},\n",
" 'max_score': 1.0,\n",
" 'hits': [{'_index': 'churn',\n",
" '_id': '0',\n",
" '_score': 1.0,\n",
" '_source': {'state': 'KS',\n",
" 'account length': 128,\n",
" 'area code': 415,\n",
" 'phone number': '382-4657',\n",
" 'international plan': 'no',\n",
" 'voice mail plan': 'yes',\n",
" 'number vmail messages': 25,\n",
" 'total day minutes': 265.1,\n",
" 'total day calls': 110,\n",
" 'total day charge': 45.07,\n",
" 'total eve minutes': 197.4,\n",
" 'total eve calls': 99,\n",
" 'total eve charge': 16.78,\n",
" 'total night minutes': 244.7,\n",
" 'total night calls': 91,\n",
" 'total night charge': 11.01,\n",
" 'total intl minutes': 10.0,\n",
" 'total intl calls': 3,\n",
" 'total intl charge': 2.7,\n",
" 'customer service calls': 1,\n",
" 'churn': 0}}]}}"
] ]
}, },
"execution_count": 6, "execution_count": 6,
@ -267,7 +251,7 @@
{ {
"data": { "data": {
"text/plain": [ "text/plain": [
"{'acknowledged': True}" "ObjectApiResponse({'acknowledged': True})"
] ]
}, },
"execution_count": 7, "execution_count": 7,
@ -297,7 +281,7 @@
"name": "python", "name": "python",
"nbconvert_exporter": "python", "nbconvert_exporter": "python",
"pygments_lexer": "ipython3", "pygments_lexer": "ipython3",
"version": "3.8.5" "version": "3.10.13"
} }
}, },
"nbformat": 4, "nbformat": 4,

View File

@ -33,10 +33,10 @@
{ {
"data": { "data": {
"text/plain": [ "text/plain": [
"AvgTicketPrice 640.387285\n", "AvgTicketPrice 639.433214\n",
"Cancelled False\n", "Cancelled False\n",
"dayOfWeek 3\n", "dayOfWeek 2\n",
"timestamp 2018-01-21 23:43:19.256498944\n", "timestamp 2018-01-21 20:23:15.159835648\n",
"dtype: object" "dtype: object"
] ]
}, },
@ -58,9 +58,9 @@
{ {
"data": { "data": {
"text/plain": [ "text/plain": [
"AvgTicketPrice 640.387285\n", "AvgTicketPrice 639.433214\n",
"Cancelled 0.000000\n", "Cancelled 0.000000\n",
"dayOfWeek 3.000000\n", "dayOfWeek 2.935777\n",
"dtype: float64" "dtype: float64"
] ]
}, },
@ -82,10 +82,10 @@
{ {
"data": { "data": {
"text/plain": [ "text/plain": [
"AvgTicketPrice 640.387285\n", "AvgTicketPrice 639.433214\n",
"Cancelled False\n", "Cancelled False\n",
"dayOfWeek 3\n", "dayOfWeek 2\n",
"timestamp 2018-01-21 23:43:19.256498944\n", "timestamp 2018-01-21 20:23:15.159835648\n",
"DestCountry NaN\n", "DestCountry NaN\n",
"dtype: object" "dtype: object"
] ]
@ -108,7 +108,7 @@
{ {
"data": { "data": {
"text/plain": [ "text/plain": [
"AvgTicketPrice 213.430365\n", "AvgTicketPrice 213.453156\n",
"dayOfWeek 2.000000\n", "dayOfWeek 2.000000\n",
"dtype: float64" "dtype: float64"
] ]
@ -131,7 +131,7 @@
{ {
"data": { "data": {
"text/plain": [ "text/plain": [
"AvgTicketPrice 213.430365\n", "AvgTicketPrice 213.453156\n",
"dayOfWeek 2.000000\n", "dayOfWeek 2.000000\n",
"dtype: float64" "dtype: float64"
] ]
@ -154,7 +154,7 @@
{ {
"data": { "data": {
"text/plain": [ "text/plain": [
"AvgTicketPrice 213.430365\n", "AvgTicketPrice 213.453156\n",
"Cancelled NaN\n", "Cancelled NaN\n",
"dayOfWeek 2.0\n", "dayOfWeek 2.0\n",
"timestamp NaT\n", "timestamp NaT\n",
@ -189,7 +189,7 @@
"name": "python", "name": "python",
"nbconvert_exporter": "python", "nbconvert_exporter": "python",
"pygments_lexer": "ipython3", "pygments_lexer": "ipython3",
"version": "3.8.5" "version": "3.10.13"
} }
}, },
"nbformat": 4, "nbformat": 4,

File diff suppressed because one or more lines are too long