https://prefect.io logo
j

jpuris

02/28/2023, 2:09 PM
Heya!! This might be stupid question 😅 I have a flow with multiple tasks.. in something like airflow I'd need to explicitly define the dependencies of tasks in a dag (flow) i.e.
Copy code
task_1 >> task_2
task_2 >> task_3
task_2 >> task_4
...
Now prefect determines this implicitly by the "parameter" direction from task to task i.e.
Copy code
result_1 = task_1
result_2 = task_2(result_1)
result_3 = task_3(result_2)
result_4 = task_4(result_2)
Which is fine, no problems there.. but what do we do when there is task 5, that does not require any result from the tasks it is dependent on? For example task 5 absolutely must run, if task 4 is done, but has no other relationship with it, like
Copy code
result_1 = task_1
result_2 = task_2(result_1)
result_3 = task_3(result_2)
result_4 = task_4(result_2)
result_5 = task_5()
With sequential task runner (default), this is OK, but if I were to use concurrent one (I want to have the tasks 3 and 4 run at the same time, but 5 must not run before task 4 is done?
1
Is the best practice to simply default using
wait_for
arg added by
@task
decorator? For example
Copy code
result_5 = task_5(wait_for=[result_4])
But what do we do then, when task_4 returns nothing? i.e.
Copy code
@task
def task_4():
    pass

@flow
def my_flow():
   result_1 = task_1
   result_2 = task_2(result_1)
   result_3 = task_3(result_2)
   result_4 = task_4(result_2) # It does not return anything?
   result_5 = task_5(wait_for=result_4)
c

Christopher Boyd

02/28/2023, 2:15 PM
yes
wait_for is the dependency key here, but you are correct, if it’s the sequential task_runner it will still run in sequence
you would need concurrent_task_runner or even dask; concurrent will run semi-parallel until it’s blocked, so if you have task4 with a dependency on task3, and task5 with no wait_for, task5 still won’t run because it will be blocked waiting for task4 to return before it gets a chance
j

jpuris

02/28/2023, 2:20 PM
I see. Thank you @Christopher Boyd! Lastly, when task_4 function has no return, would this not cause issues for result_4 variable init?
Copy code
@task
def task_4(some_param):
    pass

result_2 = None
result_4 = task_4(result_2) # It does not return anything?
I'd then think it is obligatory to return something by every task function ever i.e.
Copy code
@task
def task_4(some_param):
    print('I did nothing and returning None')
    return None

result_2 = None
result_4 = task_4(result_2)
Which then works OK
Actually all is OK.
Copy code
def returns_nothing():
    pass


nothing = returns_nothing()
print(nothing)
outputs
Copy code
None
👍