Evan Crook
06/16/2021, 10:47 PMextract -> map over might_fail -> map over show
and any failed mapped might_fail
instance passes its result to handle_error
. might_fail
also includes retries.
However what I'm seeing is: the task might_fail[3]
starts its task run, hits a RuntimeError
, enters a Retrying
state, and then finishes its task run with state Retrying
. It does not do any retries. Then the rest of the mapped tasks finish and we proceed to `handle_failure`; and handle_failure[3]
says "Not all upstream states are finished; ending run."
(because the upstream state is Retrying
, not Failed
) and then Finished task run for task with final state: 'Pending'
. So handle_failure
never actually runs at all.
If I take out retries on might_fail
it works as expected. But in the real-world example this is mimicking, might_fail
hits an API prone to rate limits / transient failures so we actually want to retry it, and then trigger handle_failure
, only if it's entered a failed state after retrying a few times.
Does this make sense? Is this a bug or am I just doing something terribly wrong? (happy to provide logs from running this minimal example, too, if it's helpful)
thanks so much in advance! 🌟Kevin Kho
Evan Crook
06/16/2021, 11:03 PM0.14.17
Evan Crook
06/16/2021, 11:04 PM@prefect.task()
def extract():
return list(range(5))
@prefect.task(max_retries=3, retry_delay=timedelta(seconds=1))
def might_fail(x):
# Pretend this task hits an API or something
if x == 3:
raise RuntimeError("oh no")
return x ** 2
@prefect.task()
def show(data):
print(f"look! it's {data}")
@prefect.task(trigger=prefect.triggers.any_failed)
def handle_failure(data):
print(f"oh no this failed: {data}")
with prefect.Flow("what") as flow:
data = extract()
transformed = might_fail.map(data)
handle_failures = handle_failure.map(transformed)
print_results = show.map(transformed)
if __name__ == "__main__":
# In a different module, i.e. a test module
flow_runner = prefect.engine.FlowRunner(flow=flow, task_runner_cls=prefect.engine.TaskRunner)
final_state = flow_runner.run(
return_tasks=flow.tasks,
executor=prefect.executors.LocalExecutor()
)
print(final_state)
Kevin Kho
from prefect import task, Flow
from prefect.executors import LocalExecutor
from datetime import timedelta
@task
def extract():
return list(range(5))
@task(max_retries=3, retry_delay=timedelta(seconds=1))
def might_fail(x):
# Pretend this task hits an API or something
if x == 3:
raise RuntimeError("oh no")
return x ** 2
@task()
def show(data):
print(f"look! it's {data}")
@task()
def handle_failure(data):
print(f"oh no this failed: {data}")
with Flow("what") as flow:
data = extract()
transformed = might_fail.map(data)
handle_failures = handle_failure.map(transformed)
print_results = show.map(transformed)
flow.executor = LocalExecutor()
flow.run()
Kevin Kho
FlowRunner
instead of flow.run()
?Evan Crook
06/17/2021, 3:54 PMEvan Crook
06/17/2021, 3:55 PMflow.run()
)Kevin Kho
flow.run()
is the more accepted pattern to use. I wouldn’t know without digging. It’s actually the first time I’ve seen someone use this for local testing.Evan Crook
06/17/2021, 4:01 PM.executor
property. That makes sense though!Kevin Kho
with Flow("what", executor=LocalExecutor()) as flow:
data = extract()
transformed = might_fail.map(data)
handle_failures = handle_failure.map(transformed)
print_results = show.map(transformed)
Evan Crook
06/17/2021, 5:16 PMCloudTaskRunner
by default, but I'd like to use a plain TaskRunner
for local testingKevin Kho
Zach Angell
flow.run()
, it seems easy enough to add that functionality if you'd like to create a GitHub issue. You should be able to get around this by setting the environment variable "PREFECT__ENGINE__TASK_RUNNER__DEFAULT_CLASS": "prefect.engine.task_runner.TaskRunner"
As to the general difference between flow.run and FlowRunner.run, this is mostly Prefect internal logic. flow.run()
actually calls FlowRunner.run
and monitors the process to handle retries
EDIT: changed prefect.enginer.cloud.TaskRunner
to prefect.enginer.task_runner.TaskRunner
Evan Crook
06/17/2021, 7:17 PMprefect.config.engine.task_runner.default_class = TaskRunner
flow.executor = LocalExecutor()
flow.run(parameters=params, runner_cls=FlowRunner)
(have to set the runner_cls=
to avoid it defaulting to a CloudFlowRunner, too)
I'll look into creating a github issue/PR too if my team's bandwidth supports it.Zach Angell