https://prefect.io logo
Title
s

sulfredlee

10/10/2022, 4:52 AM
Hi all! I am using Prefect 1.0 and looking for a way to update the flow dynamically. From this doc: https://docs-v1.prefect.io/api/latest/core/flow.html#flow-2, I see the function
set_dependencies
I am trying to have this example:
@task(name="task_A")
def task_A() -(list, int):
test_output_list = list()
test_output_int = 10
return test_output_list, test_output_int
@task(name="task_B")
def task_B(input_a: list, input_b: int, input_c: float):
pirnt(f"{input_a}, {input_b}, {input_c},"
with Flow(name = "test_flow") as flow:
ret_a_list, ret_a_int = task_A()
task_B(ret_a_list, ret_a_int, 10.05)
I would like to change the implementation for the
with Flow() as flow
to
test_flow = Flow(name="test_flow")
test_flow.set_dependencies([task_A, task_B], upstream_task=[task_A], downstream_tasks=[task_B])
I have 2 questions: - is the
set_dependencies
the correct function I should use? - how to use
set_dependencies
correctly to pass the output from
task_A
to
task_B
? Thanks
1
k

Kalise Richmond

10/10/2022, 4:17 PM
Hi, you're correct. Using the imperative api to set_dependencies is correct. In order to pass the output from
task_A
to
task_B
you can use the keyword_tasks. Here's a quick sample:
from prefect import Task, Flow
import random


class RandomNumber(Task):
    def run(self):
        x = random.randint(0, 100)
        print(x)
        return x

class PlusOneTask(Task):
    def run(self, x):
        print(x)
        return x + 1

flow = Flow('My Imperative Flow')
plus_one = PlusOneTask()
flow.set_dependencies(
    task=plus_one,
    keyword_tasks=dict(x=RandomNumber())
)

flow.run()
🙌 1
Here's some more documentation about the imperative api
👍 1
s

sulfredlee

10/10/2022, 11:44 PM
Thanks for your reply. I will try this.