Casey Green
08/24/2021, 3:35 PMKevin Kho
run_job
task, and then LOOP there. You can just do trigger_job.run()
and wait_to_complete.run()
inside the big task, but they will just be like functions instead of Prefect tasks.Casey Green
08/24/2021, 3:54 PMCasey Green
08/24/2021, 4:15 PMwith Flow("test") as flow:
flow_name = Parameter("flow_name", required=True)
handle = trigger_job(job_name)
result = wait_to_complete(handle)
max_runs = 3
# maybe this isn't a case... perhaps a terminal_state_handler that inspects the result and sets the state appropriately?
with case(should_re_run(result), True):
# trigger flow failure, but automatically retry.
Kinda looks and feels like a code smell 🤷♂️Casey Green
08/24/2021, 4:17 PMCasey Green
08/24/2021, 4:17 PMKevin Kho
StartFlowRun
task or create_flow_run
task, then we have retries, but the concept of a flow retry is not definitive because some users expect everything to run (even successful tasks) and some expect to run from where it left off. I think retries on these two Prefect tasks can be used for both scenarios by supplying a idempotency_key
Casey Green
08/24/2021, 5:00 PM