https://prefect.io logo
c

Chris L.

01/24/2022, 3:52 AM
Hello Prefecters! Wondering how one might use the resource manager (https://docs.prefect.io/api/latest/tasks/resources.html#functions) with the imperative API for flows (https://docs.prefect.io/core/concepts/flows.html#imperative-api). I'm using the imperative API with a for loop to add a list of tasks to the flow. But I would like to pass the result (a SQL connection) from
ResourceManager().setup()
to my tasks and also have cleanup. Thanks!
k

Kevin Kho

01/24/2022, 4:16 AM
This seems quite hard. I would actually advise to use the for-loop in the functional API to add the tasks to the Flow if you can?
👀 1
c

Chris L.

01/24/2022, 4:18 AM
Ah of course. I could just use task mapping.
k

Kevin Kho

01/24/2022, 4:20 AM
Yes but also you can do something like this:
Copy code
from prefect import Flow, task


@task
def add_one(x):
    res = x + 1
    print(res)
    return res


@task
def add_two(x):
    res = x + 2
    print(res)
    return res


with Flow("forloop") as flow:
    inputs = [1, 2, 3, 4, 5]
    tasks = []
    for _in in inputs:
        a = add_one(_in)
        b = add_two(a)
        tasks.append(a)
        tasks.append(b)

    # set dependencies
    for i in range(1, len(tasks)):
        tasks[i].set_upstream(tasks[i - 1])

flow.run()
🙌 1
c

Chris L.

01/24/2022, 4:21 AM
Didn't know that was possible! Thanks :O
k

Kevin Kho

01/24/2022, 4:23 AM
Only if the
inputs
is a pre-defined list. If it’s dynamic and the result of a Task, then you can’t loop over it because the value doesnt exist when the DAG is built
👀 1
8 Views