From 33cf029efeb0948cb5cc8c1f09ffc397e56acb52 Mon Sep 17 00:00:00 2001 From: Bart Broere Date: Thu, 15 Feb 2024 08:32:54 +0100 Subject: [PATCH] Implement eland.DataFrame.to_json (#661) Co-authored-by: Quentin Pradet --- docs/sphinx/reference/api/eland.DataFrame.rst | 1 + .../reference/api/eland.DataFrame.to_json.rst | 6 + docs/sphinx/reference/dataframe.rst | 1 + docs/sphinx/reference/supported_apis.rst | 2 +- eland/dataframe.py | 42 ++++++ eland/operations.py | 41 +++++ eland/query_compiler.py | 8 + tests/dataframe/test_to_csv_pytest.py | 2 +- tests/dataframe/test_to_json_pytest.py | 141 ++++++++++++++++++ 9 files changed, 242 insertions(+), 2 deletions(-) create mode 100644 docs/sphinx/reference/api/eland.DataFrame.to_json.rst create mode 100644 tests/dataframe/test_to_json_pytest.py diff --git a/docs/sphinx/reference/api/eland.DataFrame.rst b/docs/sphinx/reference/api/eland.DataFrame.rst index 8daeed3..e24e87d 100644 --- a/docs/sphinx/reference/api/eland.DataFrame.rst +++ b/docs/sphinx/reference/api/eland.DataFrame.rst @@ -49,6 +49,7 @@ ~DataFrame.tail ~DataFrame.to_csv ~DataFrame.to_html + ~DataFrame.to_json ~DataFrame.to_numpy ~DataFrame.to_pandas ~DataFrame.to_string diff --git a/docs/sphinx/reference/api/eland.DataFrame.to_json.rst b/docs/sphinx/reference/api/eland.DataFrame.to_json.rst new file mode 100644 index 0000000..be89aa4 --- /dev/null +++ b/docs/sphinx/reference/api/eland.DataFrame.to_json.rst @@ -0,0 +1,6 @@ +eland.DataFrame.to\_json +======================= + +.. currentmodule:: eland + +.. automethod:: DataFrame.to_json diff --git a/docs/sphinx/reference/dataframe.rst b/docs/sphinx/reference/dataframe.rst index 2faf9dd..36a5e19 100644 --- a/docs/sphinx/reference/dataframe.rst +++ b/docs/sphinx/reference/dataframe.rst @@ -140,5 +140,6 @@ Serialization / IO / Conversion DataFrame.to_numpy DataFrame.to_csv DataFrame.to_html + DataFrame.to_json DataFrame.to_string DataFrame.to_pandas diff --git a/docs/sphinx/reference/supported_apis.rst b/docs/sphinx/reference/supported_apis.rst index 3a7eacb..f306239 100644 --- a/docs/sphinx/reference/supported_apis.rst +++ b/docs/sphinx/reference/supported_apis.rst @@ -395,7 +395,7 @@ script instead of being modified manually. +---------------------------------------+------------+ | ``ed.DataFrame.to_html()`` | **Yes** | +---------------------------------------+------------+ -| ``ed.DataFrame.to_json()`` | No | +| ``ed.DataFrame.to_json()`` | **Yes** | +---------------------------------------+------------+ | ``ed.DataFrame.to_latex()`` | No | +---------------------------------------+------------+ diff --git a/eland/dataframe.py b/eland/dataframe.py index 045166b..3d71eee 100644 --- a/eland/dataframe.py +++ b/eland/dataframe.py @@ -1342,6 +1342,48 @@ class DataFrame(NDFrame): } return self._query_compiler.to_csv(**kwargs) + def to_json( + self, + path_or_buf=None, + orient=None, + date_format=None, + double_precision=10, + force_ascii=True, + date_unit="ms", + default_handler=None, + lines=False, + compression="infer", + index=True, + indent=None, + storage_options=None, + ): + """Write Elasticsearch data to a json file. + + By setting the ``lines`` parameter to ``True``, and ``orient`` to ``'records'``, + the entire DataFrame can be written in a streaming manner. + Doing so avoids the need to have the entire DataFrame in memory. + This format is known as JSON lines and can use the file extension ``.jsonl``. + + See Also + -------- + :pandas_api_docs:`pandas.DataFrame.to_json` + """ + kwargs = { + "path_or_buf": path_or_buf, + "orient": orient, + "date_format": date_format, + "double_precision": double_precision, + "force_ascii": force_ascii, + "date_unit": date_unit, + "default_handler": default_handler, + "lines": lines, + "compression": compression, + "index": index, + "indent": indent, + "storage_options": storage_options, + } + return self._query_compiler.to_json(**kwargs) + def to_pandas(self, show_progress: bool = False) -> pd.DataFrame: """ Utility method to convert eland.Dataframe to pandas.Dataframe diff --git a/eland/operations.py b/eland/operations.py index f9b58f0..168001b 100644 --- a/eland/operations.py +++ b/eland/operations.py @@ -16,6 +16,7 @@ # under the License. import copy +import os import warnings from collections import defaultdict from datetime import datetime @@ -1250,6 +1251,46 @@ class Operations: if path_or_buf is None: return "".join(result) + def to_json( # type: ignore + self, + query_compiler: "QueryCompiler", + path_or_buf=None, + orient=None, + lines=False, + **kwargs, + ): + if orient == "records" and lines is True: + result: List[str] = [] + our_filehandle = False + if isinstance(path_or_buf, os.PathLike): + buf = open(path_or_buf, "w") + our_filehandle = True + elif isinstance(path_or_buf, str): + buf = open(path_or_buf, "w") + our_filehandle = True + else: + buf = path_or_buf + for i, df in enumerate( + self.search_yield_pandas_dataframes(query_compiler=query_compiler) + ): + output = df.to_json( + orient=orient, + lines=lines, + **kwargs, + ) + if buf is None: + result.append(output) + else: + buf.write(output) + # If we opened the file ourselves, we should close it + if our_filehandle: + buf.close() + return "".join(result) or None + else: + return self.to_pandas(query_compiler=query_compiler).to_json( + path_or_buf, orient=orient, lines=lines, **kwargs + ) + def to_pandas( self, query_compiler: "QueryCompiler", show_progress: bool = False ) -> pd.DataFrame: diff --git a/eland/query_compiler.py b/eland/query_compiler.py index 98ef614..6eb818e 100644 --- a/eland/query_compiler.py +++ b/eland/query_compiler.py @@ -514,6 +514,14 @@ class QueryCompiler: """ return self._operations.to_csv(query_compiler=self, **kwargs) + def to_json(self, **kwargs) -> Optional[str]: + """Serialises Eland Dataframe to CSV + + Returns: + If path_or_buf is None, returns the resulting json as a string. + """ + return self._operations.to_json(query_compiler=self, **kwargs) + def search_yield_pandas_dataframes(self) -> Generator["pd.DataFrame", None, None]: return self._operations.search_yield_pandas_dataframes(self) diff --git a/tests/dataframe/test_to_csv_pytest.py b/tests/dataframe/test_to_csv_pytest.py index 1971d77..7b1ec4a 100644 --- a/tests/dataframe/test_to_csv_pytest.py +++ b/tests/dataframe/test_to_csv_pytest.py @@ -15,7 +15,7 @@ # specific language governing permissions and limitations # under the License. -# File called _pytest for PyCharm compatability +# File called _pytest for PyCharm compatibility import ast import time diff --git a/tests/dataframe/test_to_json_pytest.py b/tests/dataframe/test_to_json_pytest.py new file mode 100644 index 0000000..da13497 --- /dev/null +++ b/tests/dataframe/test_to_json_pytest.py @@ -0,0 +1,141 @@ +# Licensed to Elasticsearch B.V. under one or more contributor +# license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright +# ownership. Elasticsearch B.V. licenses this file to you under +# the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +# File called _pytest for PyCharm compatibility + +from io import StringIO +from pathlib import Path + +import pandas +from pandas.testing import assert_frame_equal + +from tests.common import ROOT_DIR, TestData + + +class TestDataFrameToJSON(TestData): + + def test_to_json_default_arguments(self): + ed_flights = self.ed_flights() + pd_flights = self.pd_flights() + ed_flights.to_json(ROOT_DIR + "/dataframe/results/eland_to_json.jsonl") + pd_flights.to_json(ROOT_DIR + "/dataframe/results/pandas_to_json.jsonl") + + assert_frame_equal( + pandas.read_json(ROOT_DIR + "/dataframe/results/eland_to_json.jsonl"), + pandas.read_json(ROOT_DIR + "/dataframe/results/pandas_to_json.jsonl"), + ) + + def test_to_json_streaming_mode(self): + ed_flights = self.ed_flights() + pd_flights = self.pd_flights() + ed_flights.to_json( + ROOT_DIR + "/dataframe/results/streaming_eland_to_json.jsonl", + lines=True, + orient="records", + ) + pd_flights.to_json( + ROOT_DIR + "/dataframe/results/streaming_pandas_to_json.jsonl", + lines=True, + orient="records", + ) + + assert_frame_equal( + pandas.read_json( + ROOT_DIR + "/dataframe/results/streaming_eland_to_json.jsonl", + lines=True, + orient="records", + ), + pandas.read_json( + ROOT_DIR + "/dataframe/results/streaming_pandas_to_json.jsonl", + lines=True, + orient="records", + ), + ) + + def test_to_json_streaming_mode_pathlib(self): + root_dir = Path(ROOT_DIR) + + ed_flights = self.ed_flights() + pd_flights = self.pd_flights() + ed_flights.to_json( + root_dir / "dataframe" / "results" / "pathlib_eland_to_json.jsonl", + lines=True, + orient="records", + ) + pd_flights.to_json( + root_dir / "dataframe" / "results" / "pathlib_pandas_to_json.jsonl", + lines=True, + orient="records", + ) + + assert_frame_equal( + pandas.read_json( + root_dir / "dataframe" / "results" / "pathlib_eland_to_json.jsonl", + lines=True, + orient="records", + ), + pandas.read_json( + root_dir / "dataframe" / "results" / "pathlib_pandas_to_json.jsonl", + lines=True, + orient="records", + ), + ) + + def test_to_json_with_other_buffer(self): + + ed_flights = self.ed_flights() + pd_flights = self.pd_flights() + output_buffer = StringIO() + ed_flights.to_json(output_buffer, lines=True, orient="records") + output_string = pd_flights.to_json(lines=True, orient="records") + + output_buffer.seek(0) # rewind our StringIO object + + assert_frame_equal( + pandas.read_json(output_buffer, lines=True, orient="records"), + pandas.read_json( + StringIO(output_string), + lines=True, + orient="records", + ), + ) + + def test_to_json_with_file_handle(self): + root_dir = Path(ROOT_DIR) + + ed_flights = self.ed_flights() + pd_flights = self.pd_flights() + with open( + root_dir / "dataframe" / "results" / "fh_eland_to_json.jsonl", "w" + ) as w: + ed_flights.to_json(w) + pd_flights.to_json( + root_dir / "dataframe" / "results" / "check_pandas_to_json.jsonl" + ) + + assert_frame_equal( + pandas.read_json( + ROOT_DIR + "/dataframe/results/fh_eland_to_json.jsonl", + lines=True, + orient="records", + ), + pandas.read_json( + ROOT_DIR + "/dataframe/results/check_pandas_to_json.jsonl", + lines=True, + orient="records", + ), + )