Andreas
06/15/2022, 10:17 AMDaskTaskRunner
tasks can stay in a pending state forever if they have custom dependencies (using _`wait_for`)_ on tasks that have failed. The failed state of the upstream tasks do not cause the downstream tasks to fail.. This isn't the case when using the default ConcurrentTaskRunner
where the failed state correctly propagates downwards and causes the dependent tasks to fail by defaultimport time
from prefect import task, flow
from prefect.task_runners import DaskTaskRunner
@task
def this_will_fail(name):
raise Exception(f"I will fail {name}")
@task
def always_pending():
print(f"Sleeping for 3 secs")
time.sleep(3)
@task
def normal_task():
print("doing stuff")
time.sleep(1)
@flow(task_runner=DaskTaskRunner)
def always_pending_flow():
fail_a = this_will_fail('a')
fail_b = this_will_fail('b')
wont_fail = normal_task()
always_pending(wait_for=[fail_a, wont_fail, fail_b])
if __name__ == "__main__":
state = always_pending_flow()
If the task_runner is changed to ConcurrentTaskRunner
the flow finishes correctly in a failed state. This isn't the case with DaskTaskRunner
though which causes the flow to run forever where always_pending
task stays in a pending state indefinitely.Anna Geller
prefect.exceptions.UpstreamTaskError: Upstream task run '2e84424d-89c0-4c68-bed1-bc9c58c9212c' did not reach a 'COMPLETED' state.
Let me open an issue
@Marvin open "`DaskTaskRunner` struggles propagating state dependencies downstream when using `wait_for`"Marvin
06/15/2022, 11:25 AMAndreas
06/15/2022, 11:47 AM