Shuchita Tripathi
04/18/2022, 7:12 PMdef task1():
@task
def run_terraform_lt():
tf = Terraform(working_dir="law_tf")
tf.init()
tf.apply()
task2.py
def task2():
@task
def run_terraform_rt():
tf = Terraform(working_dir="rt_tf")
tf.init()
tf.apply()
The input is similar to this:
{
"task1": {
"id": "foo",
"task_name": "task1"
},
"task2": {
"id": "bar",
"task_name": "task2",
}
}
I am getting the task name from this dictionary. Based on the value of "task_name", I have to create a flow combining all tasks.
I am creating a flow where I am trying to add them, but the tasks are not getting added in the flow. Anyone has any idea on how this can be achieved? Here is the snippet of my flow creation code. It is inside a for loop through which I am extracting the task name and other variables.Kevin Kho
def task2():
@task
def run_terraform_rt():
tf = Terraform(working_dir="rt_tf")
tf.init()
tf.apply()
How do you call this in the Flow?Shuchita Tripathi
04/18/2022, 7:28 PMKevin Kho
@task
def run_terraform_rt():
tf = Terraform(working_dir="rt_tf")
tf.init()
tf.apply()
and then call run_terraform_rt in the flow. Why do you need the outer function?Shuchita Tripathi
04/18/2022, 7:38 PMKevin Kho
Shuchita Tripathi
04/18/2022, 7:44 PMKevin Kho
Shuchita Tripathi
04/18/2022, 8:05 PM@task
def task1():
tf = Terraform(working_dir="tf")
tf.init()
tf.plan()
tf.apply(skip_plan=True)
Kevin Kho
from prefect import Flow, task
with Flow("name") as flow:
@task
def abc():
return 1
which will complain because the task is defined but not called. It should really be defined outside then used
@task
def abc():
return 1
with Flow("name") as flow:
abc()
or you could all it
from prefect import Flow, task
with Flow("name") as flow:
@task
def abc():
return 1
abc()
but i don’t think we’d recommend defining the tasks inside the FlowShuchita Tripathi
04/18/2022, 8:13 PMKevin Kho
Shuchita Tripathi
04/18/2022, 8:15 PMKevin Kho
exec
function which takes in a string and runs it as python code? but I think this will not work well during run time. I think the Flow may have problems finding that task definition. I dont knowShuchita Tripathi
04/18/2022, 8:17 PMKevin Kho
@task
def task_one(val):
if val == "func_a":
func_a()
if val == "func_b":
func_b()
like that?Shuchita Tripathi
04/18/2022, 8:23 PMKevin Kho
def create_flow(var):
with Flow(...) as flow:
if var:
x = task_one()
else:
x = task_two()
task_two(x)
I think this might workShuchita Tripathi
04/18/2022, 8:28 PM