Nicholas Crews
05/01/2023, 4:02 PMNate
05/01/2023, 6:50 PMtask_x
into task_y
as an argument, something like
@task
def get_customers() -> pd.DataFrame:
# build df or something
return df
@task
def transform_customers(df: pd.DataFrame) -> pd.DataFrame:
# transform df or something
@flow
def customer_etl():
customers = get_customers()
transform_customers(customers)
where the prefect engine will now know that the result from get_customers
is an upstream dependency of transform_customers
2. depending on your use-case, you may be interested in creating your own Serializer. you can check this implementation as a reference, although I'm not sure if that would cover your desire to
to add custom save/load hookscould you explain more what you mean by that? 3. hmm, the first thing that comes to mind is use a hash of A's result as the cache key for B - would something like that work? you could write your own cache key fn
Nicholas Crews
05/02/2023, 3:35 AMdef save(table): ibis.expr.types.Table):
conn = ibis.connect("mydb.duckdb")
table_name = prefect.runtime.task_run.name
conn.create_table(table_name, table)