Muhammad Daniyal
04/05/2022, 10:36 AMdef fun1(): <some code here>
def fun2(): <some code here>
def fun3(): <some code here>
def fun4(): <some code here>
def fun5(): <some code here>
def fun6(): <some code here>
def execute_flow():
@task
def t1():
fun1()
@task
def t2():
fun2()
@task
def t3():
fun3()
@task
def t4():
fun4()
@task
def t5():
fun5()
@task
def t6():
fun6()
with Flow('my flow') as f:
a = t1()
b = t2()
t3(a)
t4(b)
c = t5()
d = t6(c)
output = f.run()
result = output.result[d]._result.value
return result
the expected behaviour of flow would be t1 -> t2 -> t3 -> t4 -> t5 -> t6
but it is not working this way
instead this is what happenning
t1 -> t5{fails but shows 'success'} -> t6{fails since result from t5 are not appropiate} -> t2 -> t3{shows success message without even going through half way of function} -> t4{same result as t3}
except t5 and t6, every task is time takingAnna Geller
Anna Geller
Muhammad Daniyal
04/05/2022, 10:45 AMMuhammad Daniyal
04/05/2022, 10:46 AMMuhammad Daniyal
04/05/2022, 10:46 AMAnna Geller
Muhammad Daniyal
04/05/2022, 10:51 AMAnna Geller
from prefect import task, Flow
from prefect.triggers import always_run
@task
def task_1():
pass
@task
def task_2():
pass
@task
def task_3():
pass
with Flow("flow_with_dependencies") as flow:
t1 = task_1()
t2 = task_2(upstream_tasks=[t1])
t3 = task_3(t1)
t4 = task_4(t2)
t5 = task_5(upstream_tasks=[t4], trigger=always_run)
t6 = task_6(t5, trigger=always_run)
Anna Geller
somewhat high scale tasks are there in flow. if i say in simple words the flow is 'uploading data -> preprocess data -> convert data to desired format -> train models on converted data -> make prediction upon them'This should work fine in 2.0. By massive scale, I meant more running millions of tasks in parallel with DaskTaskRunner
Muhammad Daniyal
04/05/2022, 10:56 AMMuhammad Daniyal
04/05/2022, 10:57 AMMuhammad Daniyal
04/05/2022, 12:11 PMAnna Geller
high computationthis wouldn't be a problem for Prefect because it runs on your own execution environment - only the amount of orchestratable actions is something that could bring the API to its limits in any way Re:
problem is that they are not running in sequential order as defined in flowYou can solve it by attaching SequentialTaskRunner to your flow in 2.0 or LocalExecutor in 1.0
Bring your towel and join one of the fastest growing data communities. Welcome to our second-generation open source orchestration platform, a completely rethought approach to dataflow automation.
Powered by