Je suis nouveau sur ETL et je travaille sur le flux d'air et les flocons de neige. J'utilise un opérateur python pour obtenir la valeur maximale créée à partir d'une table mysql et, sur la base du xcom de cet opérateur, je crée un fichier csv de données de flocon de neige pour vider uniquement les dernières données créées de mysql vers snowflake. Le problème est que lorsque j'extrais la valeur dans le modèle SQL, airflow xcom renvoie des guillemets doubles. Et Snowflake accepte les guillemets simples dans ses requêtes SQL. Photo d'erreur
Voici mon code DAG :
def defaultconverter(o): if isinstance(o, datetime): return o.__str__() def get_max_created_timestamp(sql_table_name): hook = MySqlHook(MYSQL_CONN) check_column = f"select column_name from INFORMATION_SCHEMA.COLUMNS where table_name = '{sql_table_name}' " \ f"and table_schema = '{MYSQL_SCHEMA}';" print(hook.schema) data = hook.get_records(check_column) if any('created_at' in x for x in data): date_sql = f'select max(created_at) created_timestamp_max from {MYSQL_SCHEMA}.{sql_table_name}' (created_timestamp_max,) = hook.get_first(date_sql) return json.dumps(created_timestamp_max, default=defaultconverter) # return int(created_timestamp_max) else: return 0 default_args = { "owner": "airflow", "depends_on_past": False, "email": [], "email_on_failure": True, "email_on_retry": False, "retries": 0, "retry_delay": timedelta(minutes=5), "template_searchpath": [TEMPLATE_SEARCHPATH, ] } with DAG(dag_id="lion_sense_snowflake_to_mysql_v1", start_date=datetime(2021, 12, 1, 0, 0, 0, 0), schedule_interval="@daily", catchup=False, default_args=default_args, max_active_runs=1, ) as dag: dag.doc_md = DOCS for table in tables: mysql_table = table["mysql_table"] snowflake_table = table["snowflake_table"] delete_flag = table["delete"] get_max_timestamp_task = PythonOperator( task_id=f"get_max_timestamp_{mysql_table}", python_callable=get_max_created_timestamp, op_args=[mysql_table, ], do_xcom_push=True, ) create_snowflake_table_csv = SnowflakeOperator( task_id=f"create_snowflake_{snowflake_table}_table_csv", dag=dag, sql="sql/convert_snowflake_table_to_csv.sql", snowflake_conn_id=SNOWFLAKE_CONN_ID, warehouse=SNOWFLAKE_WAREHOUSE, database=SNOWFLAKE_DATABASE, schema=SNOWFLAKE_SCHEMA, role=SNOWFLAKE_ROLE, params={ "snowflake_table": snowflake_table, "delete_flag": delete_flag, "max_date": get_max_timestamp_task.output } )
Modèle de requête MySQL :
copy into @S3_TKXEL_DEVEOPMENT_STAGE/airflow/{{ dag.dag_id }}/{{ ds_nodash }}/{{ params.snowflake_table }}/{{ ds_nodash }}_{{ params.snowflake_table }}.csv from ( select * from {{ params.snowflake_table }} {% if params.delete_flag %} where created_at > {{ params.max_date}} {% endif %} ) file_format = (TYPE = CSV, COMPRESSION = NONE, NULL_IF=(''), field_optionally_enclosed_by='"' ) OVERWRITE = TRUE SINGLE = TRUE MAX_FILE_SIZE=5000000000;
Merci d'avance d'élargir mes connaissances.
S'il faut modifier la sortie de
get_max_created_timestamp
:Aide, la chaîne sera désormais correctement citée pour représenter les attentes en matière de chaîne de flocon de neige.