hello all! I'm encountering a weird condition that...
# ask-community
e
hello all! I'm encountering a weird condition that seems like a bug, or maybe I'm just not understanding it properly. (Minimal example in thread) so essentially we want to run a pipeline of
extract -> map over might_fail -> map over show
and any failed mapped
might_fail
instance passes its result to
handle_error
.
might_fail
also includes retries. However what I'm seeing is: the task
might_fail[3]
starts its task run, hits a
RuntimeError
, enters a
Retrying
state, and then finishes its task run with state
Retrying
. It does not do any retries. Then the rest of the mapped tasks finish and we proceed to `handle_failure`; and
handle_failure[3]
says
"Not all upstream states are finished; ending run."
(because the upstream state is
Retrying
, not
Failed
) and then
Finished task run for task with final state: 'Pending'
. So
handle_failure
never actually runs at all. If I take out retries on
might_fail
it works as expected. But in the real-world example this is mimicking,
might_fail
hits an API prone to rate limits / transient failures so we actually want to retry it, and then trigger
handle_failure
, only if it's entered a failed state after retrying a few times. Does this make sense? Is this a bug or am I just doing something terribly wrong? (happy to provide logs from running this minimal example, too, if it's helpful) thanks so much in advance! 🌟
k
Hey @Evan Crook, I think this makes sense. Could you move the code block to the thread so conserve space in the main channel. I’ll try to replicate later. What version of Prefect are you on? Thanks for the minimal example
e
sure, will do. I'm on
0.14.17
Copy code
@prefect.task()
def extract():
    return list(range(5))


@prefect.task(max_retries=3, retry_delay=timedelta(seconds=1))
def might_fail(x):
    # Pretend this task hits an API or something
    if x == 3:
        raise RuntimeError("oh no")
    return x ** 2


@prefect.task()
def show(data):
    print(f"look! it's {data}")


@prefect.task(trigger=prefect.triggers.any_failed)
def handle_failure(data):
    print(f"oh no this failed: {data}")


with prefect.Flow("what") as flow:
    data = extract()
    transformed = might_fail.map(data)
    handle_failures = handle_failure.map(transformed)
    print_results = show.map(transformed)

if __name__ == "__main__":
    # In a different module, i.e. a test module
    flow_runner = prefect.engine.FlowRunner(flow=flow, task_runner_cls=prefect.engine.TaskRunner)
    final_state = flow_runner.run(
        return_tasks=flow.tasks,
        executor=prefect.executors.LocalExecutor()
    )
    print(final_state)
k
Hey @Evan Crook, I wrote the code a bit differently and I am seeing retries on 0.14.17.
Copy code
from prefect import task, Flow
from prefect.executors import LocalExecutor
from datetime import timedelta

@task
def extract():
    return list(range(5))

@task(max_retries=3, retry_delay=timedelta(seconds=1))
def might_fail(x):
    # Pretend this task hits an API or something
    if x == 3:
        raise RuntimeError("oh no")
    return x ** 2

@task()
def show(data):
    print(f"look! it's {data}")

@task()
def handle_failure(data):
    print(f"oh no this failed: {data}")

with Flow("what") as flow:
    data = extract()
    transformed = might_fail.map(data)
    handle_failures = handle_failure.map(transformed)
    print_results = show.map(transformed)

flow.executor = LocalExecutor()
flow.run()
Is there any reason that makes you need to use the
FlowRunner
instead of
flow.run()
?
e
not specifically -- I just thought that was a generally accepted pattern to use
thanks for the reply! I have to admit I am curious though why the behavior is different in these two cases (flow runner vs.
flow.run()
)
k
The
flow.run()
is the more accepted pattern to use. I wouldn’t know without digging. It’s actually the first time I’ve seen someone use this for local testing.
e
I think I was under the impression you had to do that to specify the executor (e.g. as a LocalExecutor) and wasn't aware you could just assign to the
.executor
property. That makes sense though!
k
You can also do:
Copy code
with Flow("what", executor=LocalExecutor()) as flow:
    data = extract()
    transformed = might_fail.map(data)
    handle_failures = handle_failure.map(transformed)
    print_results = show.map(transformed)
e
Can you specify a task runner class in this? It is trying to use
CloudTaskRunner
by default, but I'd like to use a plain
TaskRunner
for local testing
k
I’ll ask someone who knows more than me about this and get back to you.
🙏 1
z
hi @Evan Crook right now you can't specify a task runner class when calling
flow.run()
, it seems easy enough to add that functionality if you'd like to create a GitHub issue. You should be able to get around this by setting the environment variable
"PREFECT__ENGINE__TASK_RUNNER__DEFAULT_CLASS": "prefect.engine.task_runner.TaskRunner"
As to the general difference between flow.run and FlowRunner.run, this is mostly Prefect internal logic.
flow.run()
actually calls
FlowRunner.run
and monitors the process to handle retries EDIT: changed
prefect.enginer.cloud.TaskRunner
to
prefect.enginer.task_runner.TaskRunner
e
@Zach Angell thanks for the advice and clarification! Inspired by this instead of setting the env variable I tried setting config directly from the testing code and this worked:
Copy code
prefect.config.engine.task_runner.default_class = TaskRunner
flow.executor = LocalExecutor()
flow.run(parameters=params, runner_cls=FlowRunner)
(have to set the
runner_cls=
to avoid it defaulting to a CloudFlowRunner, too) I'll look into creating a github issue/PR too if my team's bandwidth supports it.
👍 1
z
nice! yep setting that config works just as well