In Prefect 2.0b6 when using `DaskTaskRunner` tasks...
# prefect-community
In Prefect 2.0b6 when using
tasks can stay in a pending state forever if they have custom dependencies (using _`wait_for`)_ on tasks that have failed. The failed state of the upstream tasks do not cause the downstream tasks to fail.. This isn't the case when using the default
where the failed state correctly propagates downwards and causes the dependent tasks to fail by default
This can be replicated with the following code:
Copy code
import time
from prefect import task, flow
from prefect.task_runners import DaskTaskRunner

def this_will_fail(name):
    raise Exception(f"I will fail {name}")

def always_pending():
    print(f"Sleeping for 3 secs")
def normal_task():
    print("doing stuff")
def always_pending_flow():
    fail_a = this_will_fail('a')
    fail_b = this_will_fail('b')
    wont_fail = normal_task()
    always_pending(wait_for=[fail_a, wont_fail, fail_b])
if __name__ == "__main__": 
    state = always_pending_flow()
If the task_runner is changed to
the flow finishes correctly in a failed state. This isn't the case with
though which causes the flow to run forever where
task stays in a pending state indefinitely.
Thanks for raising this! I can reproduce and sense that something is wrong, either on the runner or in how we configure that in a flow.
Copy code
prefect.exceptions.UpstreamTaskError: Upstream task run '2e84424d-89c0-4c68-bed1-bc9c58c9212c' did not reach a 'COMPLETED' state.
Let me open an issue @Marvin open "`DaskTaskRunner` struggles propagating state dependencies downstream when using `wait_for`"
❤️ 1
Thank you Anna for opening this😀
🙌 1