https://prefect.io logo
Title
s

Sylvain Hazard

11/22/2021, 11:01 AM
Hello ! Is it possible to use the
create_flow_run
paradigm to create multiple flow runs with differing parameters using
map
? Been trying it out and can't seem to make it work. Code below.
from prefect import Flow, task, Parameter, unmapped
import prefect
from prefect.tasks.prefect import create_flow_run, wait_for_flow_run

@task
def plus_one(x: int):
    return x+1

@task
def squared(x: int):
    return x**2

@task
def _print(x: str):
    logger = prefect.context.get("logger")
    <http://logger.info|logger.info>(x)
    

with Flow(name="Plus one squared") as plus_one_flow:
    x = Parameter("x", default=1)
    plus_one_t = plus_one(x)
    sq = squared(plus_one_t)
    _print(sq)

plus_one_flow.register(project_name="tmp")

@task
def get_numbers():
    return [1, 2 ,3, 4]

with Flow(name="Mapped plus one") as mapped_flow:
    numbers = get_numbers()
    mapped_flows = create_flow_run.map(flow_name=unmapped("Plus one squared"), project_name=unmapped("tmp"), parameters={"x": numbers})
    mapped_wait_task = wait_for_flow_run(mapped_flows, raise_final_state=True)

mapped_flow.run()
And the error I get :
[2021-11-22 12:00:27+0100] INFO - prefect.FlowRunner | Beginning Flow run for 'Mapped plus one'
[2021-11-22 12:00:27+0100] INFO - prefect.TaskRunner | Task 'get_numbers': Starting task run...
[2021-11-22 12:00:27+0100] INFO - prefect.TaskRunner | Task 'get_numbers': Finished task run for task with final state: 'Success'
[2021-11-22 12:00:27+0100] INFO - prefect.TaskRunner | Task 'create_flow_run': Starting task run...
[2021-11-22 12:00:27+0100] INFO - prefect.TaskRunner | Task 'create_flow_run': Finished task run for task with final state: 'Mapped'
[2021-11-22 12:00:27+0100] ERROR - prefect.FlowRunner | Unexpected error: KeyError(0)
Traceback (most recent call last):
  File "/home/sylvain/.pyenv/versions/prefect/lib/python3.8/site-packages/prefect/engine/runner.py", line 48, in inner
    new_state = method(self, state, *args, **kwargs)
  File "/home/sylvain/.pyenv/versions/prefect/lib/python3.8/site-packages/prefect/engine/flow_runner.py", line 569, in get_flow_run_state
    executors.prepare_upstream_states_for_mapping(
  File "/home/sylvain/.pyenv/versions/prefect/lib/python3.8/site-packages/prefect/utilities/executors.py", line 668, in prepare_upstream_states_for_mapping
    value = upstream_state.result[i]
KeyError: 0
[2021-11-22 12:00:27+0100] ERROR - prefect.Mapped plus one | Unexpected error occured in FlowRunner: KeyError(0)
a

Anna Geller

11/22/2021, 11:15 AM
You are close! 🙂 If you want to map over parameters, then parameters must be a list of dictionaries. Here is a tiny example: • a child flow that should correspond to your actual flow that you want to pass various parameters to
from prefect import Flow, Parameter, task


@task(log_stdout=True)
def hello_world(user_input: str):
    print(f"hello {user_input}!")


with Flow("dummy-child-flow") as flow:
    param = Parameter("user_input", default="world")
    hw = hello_world(param)
• parent flow to trigger flow runs
from prefect import Flow, unmapped
from prefect.tasks.prefect import create_flow_run
from prefect.executors import LocalDaskExecutor


with Flow("mapped_flows", executor=LocalDaskExecutor()) as flow:
    parameters = [dict(user_input="Prefect"),
                  dict(user_input="Marvin"),
                  dict(user_input="World"),
                  ]
    mapped_flows = create_flow_run.map(
        parameters=parameters,
        flow_name=unmapped("dummy-child-flow"),
        project_name=unmapped("community"),
    )
s

Sylvain Hazard

11/22/2021, 11:20 AM
Oh that makes a lot of sense ! It makes dealing with multiple parameters way easier on the core engine. What about the
wait_for_flow_run
part ? I naively tried going for
wait_for_flow_run.map(mapped_flows, raise_final_state=True)
but it seems to fail instantly 😞
Also, just to make sure I understand correctly what's going on : the number of concurrent flows that will be ran concurrently depends entirely on the number of tasks the parent flow is able to run concurrently ? e.g. if I have a
DaskExecutor
with a Dask cluster composed of 6 nodes, I'll run 6 instances of the subflow at a time ?
a

Anna Geller

11/22/2021, 11:31 AM
the number of concurrent flows that will be ran concurrently depends entirely on the number of tasks the parent flow is able to run concurrently ?
The number of concurrent flows will be equal to the number of elements of a list you map over - here the list of parameters. Of course, if your instance is somehow overwhelmed, maybe it can’t handle it, but conceptually there are no restrictions here.
I tested it with wait_for_flow_run and it looks like it works just fine, as long as you put the raise kwarg as unmapped:
from prefect import Flow, unmapped, task
from prefect.tasks.prefect import create_flow_run, wait_for_flow_run
from prefect.executors import LocalDaskExecutor
from prefect.triggers import all_finished


with Flow("mapped_flows", executor=LocalDaskExecutor()) as flow:
    parameters = [
        dict(user_input="Arthur"),
        dict(user_input="Marvin"),
        dict(user_input="Ford"),
    ]
    mapped_flow_ids = create_flow_run.map(
        parameters=parameters,
        flow_name=unmapped("dummy-child-flow"),
        project_name=unmapped("p"),
    )
    wait_for_flow_run.map(mapped_flow_ids, raise_final_state=unmapped(True))


if __name__ == "__main__":
    flow.register("p")
s

Sylvain Hazard

11/22/2021, 12:39 PM
Right, I missed the unmapped part. It works perfectly well now, thanks ! Is there a way to restrict the number of concurrent flows that will be created by this kind of pattern ? In my actual use case, we will have up to ~200K elements in the parameters list and I'm afraid our DevOps team will hate me when I try to run this on our k8s cluster.
a

Anna Geller

11/22/2021, 12:44 PM
if you want to batch this somehow, perhaps you can queue those runs somehow? You could definitely create several “parent” flow runs that trigger several child flow runs, and those parent runs could be scheduled to run at a different time. Or perhaps you can temporarily scale out the cluster for such large workload and then scale in when you’re done?
in Prefect Cloud we have concurrency limits and this would make it easy to queue flow runs and batchify the process. One community user contributed this code snippet to limit flow run concurrency on Server - perhaps you can give it a try? https://github.com/PrefectHQ/server/issues/307
s

Sylvain Hazard

11/22/2021, 1:03 PM
Thanks for the answers and snippet ! I'll check that out !
k

Kevin Kho

11/22/2021, 2:44 PM
Does the parent flow need the Dask executor here? I am thinking if you have a LocalDaskExecutor with 4 threads, it would limit to 4 subflows being created and waited more. The hope is that it is executed depth-first because of the iterated mapping. Those subflows would then start on their own executor. There is currently some memory bloat with the DaskExecutor. You might encounter it when mapping over 200k elements. We have a fix around the corner for this.
:upvote: 1
s

Sylvain Hazard

11/22/2021, 2:56 PM
That looks exactly like what I was aiming for : having a depth-first execution - including the sub-flow part - but limiting the amount of concurrent sub-flows running. I'll definitely try that out !
k

Kevin Kho

11/22/2021, 3:04 PM
It’s not guaranteed though that it runs depth first for LocalDaskExecutor. The guarantees are stronger for DaskExecutor
s

Sylvain Hazard

11/22/2021, 3:06 PM
What could impact the ability of the engine to run depth first ? I have found that having the branches be reduced at some point often meant going back to breadth first.
k

Kevin Kho

11/22/2021, 3:07 PM
Yeah I think it’s scale of branches