Martim Lobao
02/23/2022, 4:55 PMList
and Dict
tasks in my DAG? this is taken from a barely modified example on prefect’s blog (code in thread)Martim Lobao
02/23/2022, 4:56 PMfrom typing import List
from prefect import Flow, Parameter, task
from prefect.engine.results import LocalResult
from prefect.tasks.prefect import create_flow_run, get_task_run_result
from prefect.executors import LocalExecutor
@task(result=LocalResult(), log_stdout=True)
def create_some_data(length: int):
print(length)
return list(range(length))
@task(result=LocalResult(), log_stdout=True)
def minimum(data: List[int]) -> int:
new = data[0] + 2
print("new", new)
return new
with Flow("child", executor=LocalExecutor()) as child_flow:
data_size = Parameter("data_size", default=5)
data = create_some_data(data_size, task_args=dict(slug="foobar"))
minimum(data, task_args=dict(slug="min"))
@task(log_stdout=True)
def transform_and_show(data: List[int]) -> List[int]:
print(f"Got: {data!r}")
new_data = [x + 1 for x in data]
print(f"Created: {new_data!r}")
return new_data
with Flow("parent", executor=LocalExecutor()) as parent_flow:
child_run_id = create_flow_run(
flow_name=child_flow.name, parameters=dict(data_size=10)
)
child_min = get_task_run_result(child_run_id, "min")
new_child_run_id = create_flow_run(
flow_name=child_flow.name, parameters=dict(data_size=child_min)
)
child_data = get_task_run_result(new_child_run_id, "foobar")
transform_and_show(child_data)
Anna Geller
Martim Lobao
02/23/2022, 5:17 PMList
exists at all, but it looks like the following change cleans everything up 🙂
@task
def get_flow_parameters(data_size):
return dict(data_size=data_size)
with Flow("parent", executor=LocalExecutor()) as parent_flow:
child_run_id = create_flow_run(
flow_name=child_flow.name, parameters=get_flow_parameters(data_size=10)
)
child_min = get_task_run_result(child_run_id, "min")
new_child_run_id = create_flow_run(
flow_name=child_flow.name, parameters=get_flow_parameters(data_size=child_min)
)
child_data = get_task_run_result(new_child_run_id, "foobar")
transform_and_show(child_data)
Martim Lobao
02/23/2022, 5:17 PMchild_min
isn’t an int
as i was expecting but rather a <class 'prefect.tasks.core.function.FunctionTask'>
insteadAnna Geller