sulfredlee
10/10/2022, 4:52 AMset_dependencies
I am trying to have this example:
@task(name="task_A")
def task_A() -(list, int):
test_output_list = list()
test_output_int = 10
return test_output_list, test_output_int
@task(name="task_B")
def task_B(input_a: list, input_b: int, input_c: float):
pirnt(f"{input_a}, {input_b}, {input_c},"
with Flow(name = "test_flow") as flow:
ret_a_list, ret_a_int = task_A()
task_B(ret_a_list, ret_a_int, 10.05)
I would like to change the implementation for the with Flow() as flow
to
test_flow = Flow(name="test_flow")
test_flow.set_dependencies([task_A, task_B], upstream_task=[task_A], downstream_tasks=[task_B])
I have 2 questions:
- is the set_dependencies
the correct function I should use?
- how to use set_dependencies
correctly to pass the output from task_A
to task_B
?
ThanksKalise Richmond
10/10/2022, 4:17 PMtask_A
to task_B
you can use the keyword_tasks. Here's a quick sample:
from prefect import Task, Flow
import random
class RandomNumber(Task):
def run(self):
x = random.randint(0, 100)
print(x)
return x
class PlusOneTask(Task):
def run(self, x):
print(x)
return x + 1
flow = Flow('My Imperative Flow')
plus_one = PlusOneTask()
flow.set_dependencies(
task=plus_one,
keyword_tasks=dict(x=RandomNumber())
)
flow.run()
sulfredlee
10/10/2022, 11:44 PM