https://prefect.io logo
Title
s

Shuchita Tripathi

04/18/2022, 7:12 PM
Hi, I am trying to create a flow based on some dynamic values. These dynamic values tell the tasks which are required to create that flow. My tasks are saved in separate files. For eg: task1.py
def task1():
    @task
    def run_terraform_lt():
        tf = Terraform(working_dir="law_tf")
        tf.init()
		tf.apply()
task2.py
def task2():
    @task
    def run_terraform_rt():
        tf = Terraform(working_dir="rt_tf")
        tf.init()
		tf.apply()
The input is similar to this:
{
    "task1": {
      "id": "foo",
      "task_name": "task1"
    },
    "task2": {
      "id": "bar",
      "task_name": "task2",
    }
}
I am getting the task name from this dictionary. Based on the value of "task_name", I have to create a flow combining all tasks. I am creating a flow where I am trying to add them, but the tasks are not getting added in the flow. Anyone has any idea on how this can be achieved? Here is the snippet of my flow creation code. It is inside a for loop through which I am extracting the task name and other variables.
k

Kevin Kho

04/18/2022, 7:24 PM
I think the first thing here is the tasks not being added to the flow. Why is your syntax like that?
def task2():
    @task
    def run_terraform_rt():
        tf = Terraform(working_dir="rt_tf")
        tf.init()
		tf.apply()
How do you call this in the Flow?
s

Shuchita Tripathi

04/18/2022, 7:28 PM
i was thinking if i can somehow call task2(), that may work. but if you have any other suggestion, I can try it
k

Kevin Kho

04/18/2022, 7:29 PM
But you can already do:
@task
def run_terraform_rt():
    tf = Terraform(working_dir="rt_tf")
    tf.init()
    tf.apply()
and then call run_terraform_rt in the flow. Why do you need the outer function?
s

Shuchita Tripathi

04/18/2022, 7:38 PM
since the tasks name (in turn the filename) is dynamic, normal import was not working. i had to use importlib and then there were some errors without the outer function. but now i'll try once again i cannot directly call run_terraform_rt() because the task list is dynamic. this is where the problem is
k

Kevin Kho

04/18/2022, 7:40 PM
I think you are thinking to dynamically add tasks to the Flow. That is not that way to try this because the static DAG has to be created during registration time. Instead, what you need to do is make the full DAG and register it, and then conditionally skip tasks based on the parameters.
Or you can put more logic inside the task to handle the logic based on the parameter inputs. Can you use if-else inside a task to describe your logic?
s

Shuchita Tripathi

04/18/2022, 7:44 PM
Yes I can use if-else inside the task
This is the msg I am getting now
I have not yet used if-else. This is just the outer function removed
k

Kevin Kho

04/18/2022, 8:01 PM
Can I see the code?
s

Shuchita Tripathi

04/18/2022, 8:05 PM
@task
def task1():
    tf = Terraform(working_dir="tf")
    tf.init()
    tf.plan()
    tf.apply(skip_plan=True)
k

Kevin Kho

04/18/2022, 8:10 PM
I think your code is doing something like this:
from prefect import Flow, task

with Flow("name") as flow:
    @task
    def abc():
        return 1
which will complain because the task is defined but not called. It should really be defined outside then used
@task
def abc():
    return 1
with Flow("name") as flow:
    abc()
or you could all it
from prefect import Flow, task

with Flow("name") as flow:
    @task
    def abc():
        return 1
    abc()
but i don’t think we’d recommend defining the tasks inside the Flow
s

Shuchita Tripathi

04/18/2022, 8:13 PM
I understand your point. I just want to call the task which is defined in a different file in my flow. the only problem is that the task name is not static, so I cannot use a static name. I have to get it from the input which is passed as a dictionary
k

Kevin Kho

04/18/2022, 8:14 PM
This is to build a flow, not run right?
s

Shuchita Tripathi

04/18/2022, 8:15 PM
yes. just to build
k

Kevin Kho

04/18/2022, 8:16 PM
Maybe you can use the Python
exec
function which takes in a string and runs it as python code? but I think this will not work well during run time. I think the Flow may have problems finding that task definition. I dont know
s

Shuchita Tripathi

04/18/2022, 8:17 PM
ok, i'll try that. right now i am using exec_module
k

Kevin Kho

04/18/2022, 8:19 PM
Wait though, I am wondering why this can’t go inside the task?
@task
def task_one(val):
    if val == "func_a":
        func_a()
    if val == "func_b":
        func_b()
like that?
s

Shuchita Tripathi

04/18/2022, 8:23 PM
we are planning to have a lot of tasks which perform a single activity. then based on some request (a json request), a flow needs to be created which will consist of mix and match of those tasks. The tasks are creating a single resource in Azure. The json request can be to create any resource and any number of resource. One request can ask to create resource group and virtual machine. another one can ask for security group and database. etc. So we can't hardcode which tasks will go in the flow.
k

Kevin Kho

04/18/2022, 8:25 PM
I understand more, but I can’t picture how to do it elegantly. Let me just say that you can build the DAG conditionally as well:
def create_flow(var):
    with Flow(...) as flow:
        if var:
            x = task_one()
        else:
            x = task_two()
        task_two(x)
I think this might work
I encourage you though to make a smaller proof of concept because I don’t know if it will load the tasks during runtime properly. I think it might.
s

Shuchita Tripathi

04/18/2022, 8:28 PM
ok. I'll keep trying. I'll post the answer here if I find anything