Jack Sundberg
06/01/2022, 12:13 AMimport prefect
from prefect import task, Flow
@task
def parse_task():
name = prefect.context.flow_name
return {"name": name, "test": 123}
@task
def split_kwargs_task(name, test):
print(name)
print(test)
# Does NOT work, but desired approach
with Flow("hello-flow") as flow:
parameters_cleaned = parse_task()
split_kwargs_task(**parameters_cleaned) # <-- raises error
# Manually pulling out each kwarg gets this to work
with Flow("hello-flow") as flow:
parameters_cleaned = parse_task()
split_kwargs_task(
name=parameters_cleaned["name"],
test=parameters_cleaned["test"],
)
flow.run()
Kevin Kho
06/01/2022, 12:23 AM**
operator is evaluated during DAG construction time, not Flow run time. There is magic here parameters_cleaned["name"]
happens during runtime because it’s a task. There is a GetItem task that is added to the flow when you use this syntax on a Task object.Jack Sundberg
06/01/2022, 12:24 AMAnna Geller
06/01/2022, 12:24 AMtask_args
to pass some extra task argsJack Sundberg
06/01/2022, 12:25 AMAnna Geller
06/01/2022, 10:17 AM