Saya baru dalam ETL dan mengusahakan aliran udara dan kepingan salji. Saya menggunakan pengendali python untuk mendapatkan nilai ciptaan maksimum daripada jadual mysql dan berdasarkan xcom pengendali itu, saya mencipta fail csv data kepingan salji untuk membuang hanya data ciptaan terkini dari mysql ke kepingan salji. Masalahnya ialah apabila saya mengekstrak nilai dalam templat sql, aliran udara xcom mengembalikan petikan berganda. Dan Snowflake menerima petikan tunggal dalam pertanyaan sqlnya. Gambar ralat
Ini kod DAG saya:
def defaultconverter(o): if isinstance(o, datetime): return o.__str__() def get_max_created_timestamp(sql_table_name): hook = MySqlHook(MYSQL_CONN) check_column = f"select column_name from INFORMATION_SCHEMA.COLUMNS where table_name = '{sql_table_name}' " \ f"and table_schema = '{MYSQL_SCHEMA}';" print(hook.schema) data = hook.get_records(check_column) if any('created_at' in x for x in data): date_sql = f'select max(created_at) created_timestamp_max from {MYSQL_SCHEMA}.{sql_table_name}' (created_timestamp_max,) = hook.get_first(date_sql) return json.dumps(created_timestamp_max, default=defaultconverter) # return int(created_timestamp_max) else: return 0 default_args = { "owner": "airflow", "depends_on_past": False, "email": [], "email_on_failure": True, "email_on_retry": False, "retries": 0, "retry_delay": timedelta(minutes=5), "template_searchpath": [TEMPLATE_SEARCHPATH, ] } with DAG(dag_id="lion_sense_snowflake_to_mysql_v1", start_date=datetime(2021, 12, 1, 0, 0, 0, 0), schedule_interval="@daily", catchup=False, default_args=default_args, max_active_runs=1, ) as dag: dag.doc_md = DOCS for table in tables: mysql_table = table["mysql_table"] snowflake_table = table["snowflake_table"] delete_flag = table["delete"] get_max_timestamp_task = PythonOperator( task_id=f"get_max_timestamp_{mysql_table}", python_callable=get_max_created_timestamp, op_args=[mysql_table, ], do_xcom_push=True, ) create_snowflake_table_csv = SnowflakeOperator( task_id=f"create_snowflake_{snowflake_table}_table_csv", dag=dag, sql="sql/convert_snowflake_table_to_csv.sql", snowflake_conn_id=SNOWFLAKE_CONN_ID, warehouse=SNOWFLAKE_WAREHOUSE, database=SNOWFLAKE_DATABASE, schema=SNOWFLAKE_SCHEMA, role=SNOWFLAKE_ROLE, params={ "snowflake_table": snowflake_table, "delete_flag": delete_flag, "max_date": get_max_timestamp_task.output } )
Templat pertanyaan mysql:
copy into @S3_TKXEL_DEVEOPMENT_STAGE/airflow/{{ dag.dag_id }}/{{ ds_nodash }}/{{ params.snowflake_table }}/{{ ds_nodash }}_{{ params.snowflake_table }}.csv from ( select * from {{ params.snowflake_table }} {% if params.delete_flag %} where created_at > {{ params.max_date}} {% endif %} ) file_format = (TYPE = CSV, COMPRESSION = NONE, NULL_IF=(''), field_optionally_enclosed_by='"' ) OVERWRITE = TRUE SINGLE = TRUE MAX_FILE_SIZE=5000000000;
Terima kasih terlebih dahulu kerana menambah pengetahuan saya.
Sama ada hendak menukar output
get_max_created_timestamp
:Bantuan, kini rentetan akan dipetik dengan betul untuk mewakili jangkaan rentetan kepingan salji.