From 28e6d92430eb6740cd36096e531bf459723551e9 Mon Sep 17 00:00:00 2001 From: Bart Broere Date: Mon, 6 Nov 2023 11:39:31 +0100 Subject: [PATCH] Stream writes in to_csv() Co-authored-by: P. Sai Vinay --- eland/common.py | 1 - eland/operations.py | 40 ++++++++++++++++++++++++++++++---------- eland/query_compiler.py | 4 ++-- 3 files changed, 32 insertions(+), 13 deletions(-) diff --git a/eland/common.py b/eland/common.py index 5f50625..71e9bb6 100644 --- a/eland/common.py +++ b/eland/common.py @@ -41,7 +41,6 @@ if TYPE_CHECKING: # Default number of rows displayed (different to pandas where ALL could be displayed) DEFAULT_NUM_ROWS_DISPLAYED = 60 DEFAULT_CHUNK_SIZE = 10000 -DEFAULT_CSV_BATCH_OUTPUT_SIZE = 10000 DEFAULT_PROGRESS_REPORTING_NUM_ROWS = 10000 DEFAULT_SEARCH_SIZE = 5000 DEFAULT_PIT_KEEP_ALIVE = "3m" diff --git a/eland/operations.py b/eland/operations.py index a6b20f2..cf25411 100644 --- a/eland/operations.py +++ b/eland/operations.py @@ -1218,6 +1218,36 @@ class Operations: ["count", "mean", "std", "min", "25%", "50%", "75%", "max"] ) + def to_csv( # type: ignore + self, + query_compiler: "QueryCompiler", + path_or_buf=None, + header: bool = True, + mode: str = "w", + show_progress: bool = False, + **kwargs, + ) -> Optional[str]: + result = [] + processed = 0 + for i, df in enumerate( + self.search_yield_pandas_dataframes(query_compiler=query_compiler) + ): + processed += df.shape[0] + if show_progress and processed % DEFAULT_PROGRESS_REPORTING_NUM_ROWS == 0: + print(f"{datetime.now()}: read {processed} rows") + result.append( + df.to_csv( + path_or_buf=path_or_buf, + # start appending after the first batch + mode=mode if i == 0 else "a", + # only write the header for the first batch, if wanted at all + header=header if i == 0 else False, + **kwargs, + ) + ) + if path_or_buf is None: + return "".join(result) + def to_pandas( self, query_compiler: "QueryCompiler", show_progress: bool = False ) -> pd.DataFrame: @@ -1239,16 +1269,6 @@ class Operations: return query_compiler._empty_pd_ef() return pd.concat(df_list) - def to_csv( - self, - query_compiler: "QueryCompiler", - show_progress: bool = False, - **kwargs: Union[bool, str], - ) -> Optional[str]: - return self.to_pandas( # type: ignore[no-any-return] - query_compiler=query_compiler, show_progress=show_progress - ).to_csv(**kwargs) - def search_yield_pandas_dataframes( self, query_compiler: "QueryCompiler" ) -> Generator["pd.DataFrame", None, None]: diff --git a/eland/query_compiler.py b/eland/query_compiler.py index 6af60e5..98ef614 100644 --- a/eland/query_compiler.py +++ b/eland/query_compiler.py @@ -497,7 +497,7 @@ class QueryCompiler: return self._update_query(QueryFilter(query)) # To/From Pandas - def to_pandas(self, show_progress: bool = False): + def to_pandas(self, show_progress: bool = False) -> pd.DataFrame: """Converts Eland DataFrame to Pandas DataFrame. Returns: @@ -512,7 +512,7 @@ class QueryCompiler: Returns: If path_or_buf is None, returns the resulting csv format as a string. Otherwise returns None. """ - return self._operations.to_csv(self, **kwargs) + return self._operations.to_csv(query_compiler=self, **kwargs) def search_yield_pandas_dataframes(self) -> Generator["pd.DataFrame", None, None]: return self._operations.search_yield_pandas_dataframes(self)