Alexander Kloumann
08/19/2022, 9:31 PMsource_ids = ["source_1", "source_2", "source_3"]
with Flow("my_flow") as flow:
for source_id in source_ids:
data = extract(source_id)
data = transform(data)
load(data)
return flow
Thanks in advance!Nate
08/19/2022, 9:56 PMsource_ids = ["source_1", "source_2", "source_3"]
with Flow("my_flow") as flow:
data = extract.map(source_ids)
transformed_data = transform.map(data)
load.map(transformed_data)
that way, you can respond to mapped task states independently using triggers
note that you don't need a return
statement here, since in prefect 1, flows are not function definitionsprefect>=2.0
, here is how a prefect 2 flow could look:
@flow
def my_flow(source_ids: List[str] = ["source_1", "source_2", "source_3"])
for source_id in source_ids:
data = extract(source_id)
data = transform(data)
load(data)
Jimmy Le
08/20/2022, 12:07 PMAlexander Kloumann
08/24/2022, 3:44 PM