https://prefect.io logo
Title
m

Muhammad Daniyal

04/05/2022, 10:36 AM
Hello Everyone. Hope you all are doing well. I have just started with prefect. I have created a workflow containing consisting of 6 tasks. And they are defined as followed
def fun1(): <some code here>
def fun2(): <some code here>
def fun3(): <some code here>
def fun4(): <some code here>
def fun5(): <some code here>
def fun6(): <some code here>

def execute_flow():
	
	@task
	def t1():
		fun1()
	@task
	def t2():
		fun2()
	@task
	def t3():
		fun3()
	@task
	def t4():
		fun4()
	@task
	def t5():
		fun5()
	@task
	def t6():
		fun6()
		
	
	with Flow('my flow') as f:
		
		a = t1()
		b = t2()
		t3(a)
		t4(b)
		c = t5()
		d = t6(c)
		
	output = f.run()
	
	result = output.result[d]._result.value
	return result
the expected behaviour of flow would be t1 -> t2 -> t3 -> t4 -> t5 -> t6 but it is not working this way instead this is what happenning t1 -> t5{fails but shows 'success'} -> t6{fails since result from t5 are not appropiate} -> t2 -> t3{shows success message without even going through half way of function} -> t4{same result as t3} except t5 and t6, every task is time taking
a

Anna Geller

04/05/2022, 10:39 AM
Perhaps it's easier to start directly with Prefect 2.0 and Cloud 2.0?
m

Muhammad Daniyal

04/05/2022, 10:45 AM
Can we use version 2 for production? Since i am working on office project, multiple components are working separately and are working fine. Now need to put them in a workflow.
I passed upstream_tasks while calling tasks, the results were same
workflow was not going in specified direction
a

Anna Geller

04/05/2022, 10:48 AM
for mission-critical production workloads, you should use Prefect 1.0, but whether 2.0 is production-ready depends on what you want to do with it - simple scheduled workflows should work fine on Cloud 2.0, but if you need massive scale and audit trail, then it's better to use 1.0
m

Muhammad Daniyal

04/05/2022, 10:51 AM
exactly. somewhat high scale tasks are there in flow. if i say in simple words the flow is 'uploading data -> preprocess data -> convert data to desired format -> train models on converted data -> make prediction upon them'
a

Anna Geller

04/05/2022, 10:52 AM
to ensure t5 and t6 always run try this
from prefect import task, Flow
from prefect.triggers import always_run

@task
def task_1():
    pass

@task
def task_2():
    pass

@task
def task_3():
    pass

with Flow("flow_with_dependencies") as flow:
    t1 = task_1()
    t2 = task_2(upstream_tasks=[t1])
    t3 = task_3(t1)
    t4 = task_4(t2)
    t5 = task_5(upstream_tasks=[t4], trigger=always_run)
    t6 = task_6(t5, trigger=always_run)
somewhat high scale tasks are there in flow. if i say in simple words the flow is 'uploading data -> preprocess data -> convert data to desired format -> train models on converted data -> make prediction upon them'
This should work fine in 2.0. By massive scale, I meant more running millions of tasks in parallel with DaskTaskRunner
m

Muhammad Daniyal

04/05/2022, 10:56 AM
all 6 tasks are running, problem is that they are not running in sequential order as defined in flow. and other this is t1 and t2 are working fully (data preporcessing), t3 and t4 are related to conversion of data and training model, they don't run fully, like not even half and skip processing, and t5 and t6 give errors since they are of prediction and depends upon training'
oh, i thought my mean of high scale, you meant high computation.
@Open AIMP
a

Anna Geller

04/05/2022, 12:27 PM
high computation
this wouldn't be a problem for Prefect because it runs on your own execution environment - only the amount of orchestratable actions is something that could bring the API to its limits in any way Re:
problem is that they are not running in sequential order as defined in flow
You can solve it by attaching SequentialTaskRunner to your flow in 2.0 or LocalExecutor in 1.0