From 13e0be6b496569d74db5a0548c9561ceba376d66 Mon Sep 17 00:00:00 2001 From: Augusto Herrmann Date: Fri, 13 Mar 2026 11:18:46 -0300 Subject: [PATCH 1/3] Add function docstring and argument type annotation --- fastetl/operators/db_to_csv_operator.py | 14 +++++++++++++- 1 file changed, 13 insertions(+), 1 deletion(-) diff --git a/fastetl/operators/db_to_csv_operator.py b/fastetl/operators/db_to_csv_operator.py index 4547424..f9d02c3 100644 --- a/fastetl/operators/db_to_csv_operator.py +++ b/fastetl/operators/db_to_csv_operator.py @@ -11,6 +11,7 @@ from typing import Optional from airflow.models.baseoperator import BaseOperator +from airflow.utils.context import Context from fastetl.custom_functions.utils.get_table_cols_name import get_table_cols_name from fastetl.custom_functions.utils.db_connection import get_hook_and_engine_by_provider @@ -95,7 +96,18 @@ def select_all_sql(self): FROM {self.table_scheme}.{self.table_name}; """ - def execute(self, context): + def execute(self, context: Context): + """Executes the SQL query and saves the CSV file. In the process, + converts data types to integers and removes specified characters + if the options have been specified. + + Args: + context (Context): The Airflow context object. + + Returns: + str: The name of the written file. + """ + _ = context # left unused db_hook, _ = get_hook_and_engine_by_provider(self.conn_id) if self.select_sql: From bd0e3ea489d7e229886d9cd260012621eaaf9eab Mon Sep 17 00:00:00 2001 From: Augusto Herrmann Date: Fri, 13 Mar 2026 11:19:50 -0300 Subject: [PATCH 2/3] Add more tolerance to bad data in columns interpreted as integer --- fastetl/operators/db_to_csv_operator.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/fastetl/operators/db_to_csv_operator.py b/fastetl/operators/db_to_csv_operator.py index f9d02c3..df7d6c0 100644 --- a/fastetl/operators/db_to_csv_operator.py +++ b/fastetl/operators/db_to_csv_operator.py @@ -10,6 +10,8 @@ from pathlib import Path from typing import Optional +import pandas as pd + from airflow.models.baseoperator import BaseOperator from airflow.utils.context import Context @@ -126,7 +128,7 @@ def execute(self, context: Context): # Convert columns data types to int if self.int_columns: for col in self.int_columns: - df[col] = df[col].astype("Int64") + df[col] = pd.to_numeric(df[col], errors="coerce").astype("Int64") # Remove specified characters if self.characters_to_remove: From 6f2cca6d27a14c13e971dfb91e83107b9d78fea3 Mon Sep 17 00:00:00 2001 From: Augusto Herrmann Date: Fri, 13 Mar 2026 12:10:25 -0300 Subject: [PATCH 3/3] Refactor select_sql interpretation to be more flexible and handle edge cases --- fastetl/operators/db_to_csv_operator.py | 78 +++++++++++++++++++++---- 1 file changed, 68 insertions(+), 10 deletions(-) diff --git a/fastetl/operators/db_to_csv_operator.py b/fastetl/operators/db_to_csv_operator.py index df7d6c0..1516f9e 100644 --- a/fastetl/operators/db_to_csv_operator.py +++ b/fastetl/operators/db_to_csv_operator.py @@ -87,7 +87,18 @@ def __init__( self.columns_to_remove: Optional[list[str]] = columns_to_remove self.int_columns: Optional[list[str]] = int_columns - def select_all_sql(self): + def _select_all_sql(self) -> str: + """Generate a SELECT statement to fetch all columns from + a table. + + Returns: + str: a SELECT statement for all columns + """ + if any(argument is None for argument in (self.table_scheme, self.table_name)): + raise ValueError( + "table_scheme and table_name are required " + "when select_sql is not provided." + ) cols = get_table_cols_name(self.conn_id, self.table_scheme, self.table_name) if self.columns_to_remove: cols = [c for c in cols if c not in self.columns_to_remove] @@ -98,6 +109,61 @@ def select_all_sql(self): FROM {self.table_scheme}.{self.table_name}; """ + def _starts_with_sql_keyword(self, value: str) -> bool: + """Check if a string starts with a SQL keyword.""" + sql_keywords = ("SELECT", "WITH", "INSERT", "UPDATE", "DELETE") + return any(value.strip().upper().startswith(kw) for kw in sql_keywords) + + def _resolve_select_sql(self, select_sql: Optional[str | Path]) -> str: + """Resolve select_sql to a query string, handling files and inline + queries. + + Args: + select_sql (Optional[str | Path]): The SQL query or file path to + resolve. If a string, it can be either a file path or an inline + SQL query. If a file path, the file must exist. + + Raises: + FileNotFoundError: if a file path is specified but the file does not + exist. + TypeError: if select_sql is neither a string nor a Path object. + ValueError: if select_sql is None or empty and table_scheme or + table_name are not provided. + + Returns: + str: the resolved SQL query string. + """ + + if not select_sql: + return self._select_all_sql() + + # Handle Path objects + if isinstance(select_sql, Path): + if select_sql.is_file(): + return select_sql.read_text(encoding="utf-8") + raise FileNotFoundError(f"File not found: {select_sql}") + + # Handle strings + if isinstance(select_sql, str): + # Quick check: SQL keywords indicate it's a query, not a path + if self._starts_with_sql_keyword(select_sql): + return select_sql + + # Attempt to load as a file path + try: + path = Path(select_sql) + if path.is_file(): + return path.read_text(encoding="utf-8") + except OSError: + # Catches "File name too long" and other path-related OS errors + # Treat as a query string instead + pass + + # Default: treat as a query string + return select_sql + + raise TypeError(f"select_sql must be a string or Path, got {type(select_sql)}") + def execute(self, context: Context): """Executes the SQL query and saves the CSV file. In the process, converts data types to integers and removes specified characters @@ -112,15 +178,7 @@ def execute(self, context: Context): _ = context # left unused db_hook, _ = get_hook_and_engine_by_provider(self.conn_id) - if self.select_sql: - path_sql = Path(self.select_sql) - if path_sql.is_file(): - query = path_sql.read_text(encoding="utf-8") - else: - query = self.select_sql - df_select = query - else: - df_select = self.select_all_sql() + df_select = self._resolve_select_sql(self.select_sql) self.log.info(f"Executing SQL check: {df_select}") df = db_hook.get_pandas_df(df_select)