Joao Moniz
04/25/2024, 2:34 PMJoao Moniz
04/25/2024, 2:34 PMimport asyncio
from prefect import flow
@flow
async def main():
db_tables = [list_of_table_names...] # gets from prefect config block
await asyncio.gather(
*[
run_deployment(
name="deployment/name",
parameters={
"db_table": db_table,
},
flow_run_name=f"ingest_{db_table}",
)
for db_table in db_tables
]
)
if __name__ == "__main__":
asyncio.run(main())
This allow us to run each flow as an independent process in our EKS infra, running each flow in a pod. We also have a work queue set in the child deployment, which has a concurrency limit set to 10, to avoid overloading our infra, which applies to all flows from this deployment. Basically the relation is:
1 Main flow/deployment -> starts N times a deployment for N flow runs with different parameters
Today we had this issue where one of the subflow run was created twice (e.g same table, 2 flow runs), which is very odd, as we expected exactly 1 flow run for each table in the prefect block configuration. Questions:
1. Does anyone have any idea on why this issue might have happened? We also use this same approach for parallel execution for other deployments and never had this issue before (but we don't use work queues with concurrency in others).
2. Is there any other (more safe/elegant) way of running those flows in parallel without needing 2 deployments (1 for main for triggering, 1 with actual code logic for the flow) ?Joao Moniz
04/25/2024, 2:42 PMJoao Moniz
04/29/2024, 6:19 AMidempotency_key
parameter from the run_deployment
function.