Sylvain Hazard
11/22/2021, 11:01 AMcreate_flow_run
paradigm to create multiple flow runs with differing parameters using map
?
Been trying it out and can't seem to make it work. Code below.from prefect import Flow, task, Parameter, unmapped
import prefect
from prefect.tasks.prefect import create_flow_run, wait_for_flow_run
@task
def plus_one(x: int):
return x+1
@task
def squared(x: int):
return x**2
@task
def _print(x: str):
logger = prefect.context.get("logger")
<http://logger.info|logger.info>(x)
with Flow(name="Plus one squared") as plus_one_flow:
x = Parameter("x", default=1)
plus_one_t = plus_one(x)
sq = squared(plus_one_t)
_print(sq)
plus_one_flow.register(project_name="tmp")
@task
def get_numbers():
return [1, 2 ,3, 4]
with Flow(name="Mapped plus one") as mapped_flow:
numbers = get_numbers()
mapped_flows = create_flow_run.map(flow_name=unmapped("Plus one squared"), project_name=unmapped("tmp"), parameters={"x": numbers})
mapped_wait_task = wait_for_flow_run(mapped_flows, raise_final_state=True)
mapped_flow.run()
[2021-11-22 12:00:27+0100] INFO - prefect.FlowRunner | Beginning Flow run for 'Mapped plus one'
[2021-11-22 12:00:27+0100] INFO - prefect.TaskRunner | Task 'get_numbers': Starting task run...
[2021-11-22 12:00:27+0100] INFO - prefect.TaskRunner | Task 'get_numbers': Finished task run for task with final state: 'Success'
[2021-11-22 12:00:27+0100] INFO - prefect.TaskRunner | Task 'create_flow_run': Starting task run...
[2021-11-22 12:00:27+0100] INFO - prefect.TaskRunner | Task 'create_flow_run': Finished task run for task with final state: 'Mapped'
[2021-11-22 12:00:27+0100] ERROR - prefect.FlowRunner | Unexpected error: KeyError(0)
Traceback (most recent call last):
File "/home/sylvain/.pyenv/versions/prefect/lib/python3.8/site-packages/prefect/engine/runner.py", line 48, in inner
new_state = method(self, state, *args, **kwargs)
File "/home/sylvain/.pyenv/versions/prefect/lib/python3.8/site-packages/prefect/engine/flow_runner.py", line 569, in get_flow_run_state
executors.prepare_upstream_states_for_mapping(
File "/home/sylvain/.pyenv/versions/prefect/lib/python3.8/site-packages/prefect/utilities/executors.py", line 668, in prepare_upstream_states_for_mapping
value = upstream_state.result[i]
KeyError: 0
[2021-11-22 12:00:27+0100] ERROR - prefect.Mapped plus one | Unexpected error occured in FlowRunner: KeyError(0)
Anna Geller
11/22/2021, 11:15 AMfrom prefect import Flow, Parameter, task
@task(log_stdout=True)
def hello_world(user_input: str):
print(f"hello {user_input}!")
with Flow("dummy-child-flow") as flow:
param = Parameter("user_input", default="world")
hw = hello_world(param)
⢠parent flow to trigger flow runs
from prefect import Flow, unmapped
from prefect.tasks.prefect import create_flow_run
from prefect.executors import LocalDaskExecutor
with Flow("mapped_flows", executor=LocalDaskExecutor()) as flow:
parameters = [dict(user_input="Prefect"),
dict(user_input="Marvin"),
dict(user_input="World"),
]
mapped_flows = create_flow_run.map(
parameters=parameters,
flow_name=unmapped("dummy-child-flow"),
project_name=unmapped("community"),
)
Sylvain Hazard
11/22/2021, 11:20 AMwait_for_flow_run
part ? I naively tried going for wait_for_flow_run.map(mapped_flows, raise_final_state=True)
but it seems to fail instantly đDaskExecutor
with a Dask cluster composed of 6 nodes, I'll run 6 instances of the subflow at a time ?Anna Geller
11/22/2021, 11:31 AMthe number of concurrent flows that will be ran concurrently depends entirely on the number of tasks the parent flow is able to run concurrently ?The number of concurrent flows will be equal to the number of elements of a list you map over - here the list of parameters. Of course, if your instance is somehow overwhelmed, maybe it canât handle it, but conceptually there are no restrictions here.
from prefect import Flow, unmapped, task
from prefect.tasks.prefect import create_flow_run, wait_for_flow_run
from prefect.executors import LocalDaskExecutor
from prefect.triggers import all_finished
with Flow("mapped_flows", executor=LocalDaskExecutor()) as flow:
parameters = [
dict(user_input="Arthur"),
dict(user_input="Marvin"),
dict(user_input="Ford"),
]
mapped_flow_ids = create_flow_run.map(
parameters=parameters,
flow_name=unmapped("dummy-child-flow"),
project_name=unmapped("p"),
)
wait_for_flow_run.map(mapped_flow_ids, raise_final_state=unmapped(True))
if __name__ == "__main__":
flow.register("p")
Sylvain Hazard
11/22/2021, 12:39 PMAnna Geller
11/22/2021, 12:44 PMSylvain Hazard
11/22/2021, 1:03 PMKevin Kho
11/22/2021, 2:44 PMSylvain Hazard
11/22/2021, 2:56 PMKevin Kho
11/22/2021, 3:04 PMSylvain Hazard
11/22/2021, 3:06 PMKevin Kho
11/22/2021, 3:07 PM