In Prefect 2.0b6 when using `DaskTaskRunner` tasks...
# prefect-community
a
In Prefect 2.0b6 when using
DaskTaskRunner
tasks can stay in a pending state forever if they have custom dependencies (using _`wait_for`)_ on tasks that have failed. The failed state of the upstream tasks do not cause the downstream tasks to fail.. This isn't the case when using the default
ConcurrentTaskRunner
where the failed state correctly propagates downwards and causes the dependent tasks to fail by default
1
This can be replicated with the following code:
Copy code
import time
from prefect import task, flow
from prefect.task_runners import DaskTaskRunner

@task
def this_will_fail(name):
    raise Exception(f"I will fail {name}")

@task
def always_pending():
    print(f"Sleeping for 3 secs")
    time.sleep(3)
    
@task
def normal_task():
    print("doing stuff")
    time.sleep(1)
    
@flow(task_runner=DaskTaskRunner)
def always_pending_flow():
    fail_a = this_will_fail('a')
    fail_b = this_will_fail('b')
    wont_fail = normal_task()
    always_pending(wait_for=[fail_a, wont_fail, fail_b])
    
if __name__ == "__main__": 
    state = always_pending_flow()
If the task_runner is changed to
ConcurrentTaskRunner
the flow finishes correctly in a failed state. This isn't the case with
DaskTaskRunner
though which causes the flow to run forever where
always_pending
task stays in a pending state indefinitely.
a
Thanks for raising this! I can reproduce and sense that something is wrong, either on the runner or in how we configure that in a flow.
Copy code
prefect.exceptions.UpstreamTaskError: Upstream task run '2e84424d-89c0-4c68-bed1-bc9c58c9212c' did not reach a 'COMPLETED' state.
Let me open an issue @Marvin open "`DaskTaskRunner` struggles propagating state dependencies downstream when using `wait_for`"
❤️ 1
a
Thank you Anna for opening this😀
🙌 1