Muhammad Daniyal
04/05/2022, 10:36 AMdef fun1(): <some code here>
def fun2(): <some code here>
def fun3(): <some code here>
def fun4(): <some code here>
def fun5(): <some code here>
def fun6(): <some code here>
def execute_flow():
@task
def t1():
fun1()
@task
def t2():
fun2()
@task
def t3():
fun3()
@task
def t4():
fun4()
@task
def t5():
fun5()
@task
def t6():
fun6()
with Flow('my flow') as f:
a = t1()
b = t2()
t3(a)
t4(b)
c = t5()
d = t6(c)
output = f.run()
result = output.result[d]._result.value
return result
the expected behaviour of flow would be t1 -> t2 -> t3 -> t4 -> t5 -> t6
but it is not working this way
instead this is what happenning
t1 -> t5{fails but shows 'success'} -> t6{fails since result from t5 are not appropiate} -> t2 -> t3{shows success message without even going through half way of function} -> t4{same result as t3}
except t5 and t6, every task is time takingAnna Geller
04/05/2022, 10:39 AMMuhammad Daniyal
04/05/2022, 10:45 AMAnna Geller
04/05/2022, 10:48 AMMuhammad Daniyal
04/05/2022, 10:51 AMAnna Geller
04/05/2022, 10:52 AMfrom prefect import task, Flow
from prefect.triggers import always_run
@task
def task_1():
pass
@task
def task_2():
pass
@task
def task_3():
pass
with Flow("flow_with_dependencies") as flow:
t1 = task_1()
t2 = task_2(upstream_tasks=[t1])
t3 = task_3(t1)
t4 = task_4(t2)
t5 = task_5(upstream_tasks=[t4], trigger=always_run)
t6 = task_6(t5, trigger=always_run)
somewhat high scale tasks are there in flow. if i say in simple words the flow is 'uploading data -> preprocess data -> convert data to desired format -> train models on converted data -> make prediction upon them'This should work fine in 2.0. By massive scale, I meant more running millions of tasks in parallel with DaskTaskRunner
Muhammad Daniyal
04/05/2022, 10:56 AMAnna Geller
04/05/2022, 12:27 PMhigh computationthis wouldn't be a problem for Prefect because it runs on your own execution environment - only the amount of orchestratable actions is something that could bring the API to its limits in any way Re:
problem is that they are not running in sequential order as defined in flowYou can solve it by attaching SequentialTaskRunner to your flow in 2.0 or LocalExecutor in 1.0