https://prefect.io logo
Title
a

Andreas

06/15/2022, 10:17 AM
In Prefect 2.0b6 when using
DaskTaskRunner
tasks can stay in a pending state forever if they have custom dependencies (using _`wait_for`)_ on tasks that have failed. The failed state of the upstream tasks do not cause the downstream tasks to fail.. This isn't the case when using the default
ConcurrentTaskRunner
where the failed state correctly propagates downwards and causes the dependent tasks to fail by default
1
This can be replicated with the following code:
import time
from prefect import task, flow
from prefect.task_runners import DaskTaskRunner

@task
def this_will_fail(name):
    raise Exception(f"I will fail {name}")

@task
def always_pending():
    print(f"Sleeping for 3 secs")
    time.sleep(3)
    
@task
def normal_task():
    print("doing stuff")
    time.sleep(1)
    
@flow(task_runner=DaskTaskRunner)
def always_pending_flow():
    fail_a = this_will_fail('a')
    fail_b = this_will_fail('b')
    wont_fail = normal_task()
    always_pending(wait_for=[fail_a, wont_fail, fail_b])
    
if __name__ == "__main__": 
    state = always_pending_flow()
If the task_runner is changed to
ConcurrentTaskRunner
the flow finishes correctly in a failed state. This isn't the case with
DaskTaskRunner
though which causes the flow to run forever where
always_pending
task stays in a pending state indefinitely.
a

Anna Geller

06/15/2022, 11:25 AM
Thanks for raising this! I can reproduce and sense that something is wrong, either on the runner or in how we configure that in a flow.
prefect.exceptions.UpstreamTaskError: Upstream task run '2e84424d-89c0-4c68-bed1-bc9c58c9212c' did not reach a 'COMPLETED' state.
Let me open an issue @Marvin open "`DaskTaskRunner` struggles propagating state dependencies downstream when using `wait_for`"
❤️ 1
a

Andreas

06/15/2022, 11:47 AM
Thank you Anna for opening this😀
🙌 1