Hello ! Is it possible to use the `create_flow_run...
# prefect-server
s
Hello ! Is it possible to use the
create_flow_run
paradigm to create multiple flow runs with differing parameters using
map
? Been trying it out and can't seem to make it work. Code below.
Copy code
from prefect import Flow, task, Parameter, unmapped
import prefect
from prefect.tasks.prefect import create_flow_run, wait_for_flow_run

@task
def plus_one(x: int):
    return x+1

@task
def squared(x: int):
    return x**2

@task
def _print(x: str):
    logger = prefect.context.get("logger")
    <http://logger.info|logger.info>(x)
    

with Flow(name="Plus one squared") as plus_one_flow:
    x = Parameter("x", default=1)
    plus_one_t = plus_one(x)
    sq = squared(plus_one_t)
    _print(sq)

plus_one_flow.register(project_name="tmp")

@task
def get_numbers():
    return [1, 2 ,3, 4]

with Flow(name="Mapped plus one") as mapped_flow:
    numbers = get_numbers()
    mapped_flows = create_flow_run.map(flow_name=unmapped("Plus one squared"), project_name=unmapped("tmp"), parameters={"x": numbers})
    mapped_wait_task = wait_for_flow_run(mapped_flows, raise_final_state=True)

mapped_flow.run()
And the error I get :
Copy code
[2021-11-22 12:00:27+0100] INFO - prefect.FlowRunner | Beginning Flow run for 'Mapped plus one'
[2021-11-22 12:00:27+0100] INFO - prefect.TaskRunner | Task 'get_numbers': Starting task run...
[2021-11-22 12:00:27+0100] INFO - prefect.TaskRunner | Task 'get_numbers': Finished task run for task with final state: 'Success'
[2021-11-22 12:00:27+0100] INFO - prefect.TaskRunner | Task 'create_flow_run': Starting task run...
[2021-11-22 12:00:27+0100] INFO - prefect.TaskRunner | Task 'create_flow_run': Finished task run for task with final state: 'Mapped'
[2021-11-22 12:00:27+0100] ERROR - prefect.FlowRunner | Unexpected error: KeyError(0)
Traceback (most recent call last):
  File "/home/sylvain/.pyenv/versions/prefect/lib/python3.8/site-packages/prefect/engine/runner.py", line 48, in inner
    new_state = method(self, state, *args, **kwargs)
  File "/home/sylvain/.pyenv/versions/prefect/lib/python3.8/site-packages/prefect/engine/flow_runner.py", line 569, in get_flow_run_state
    executors.prepare_upstream_states_for_mapping(
  File "/home/sylvain/.pyenv/versions/prefect/lib/python3.8/site-packages/prefect/utilities/executors.py", line 668, in prepare_upstream_states_for_mapping
    value = upstream_state.result[i]
KeyError: 0
[2021-11-22 12:00:27+0100] ERROR - prefect.Mapped plus one | Unexpected error occured in FlowRunner: KeyError(0)
a
You are close! 🙂 If you want to map over parameters, then parameters must be a list of dictionaries. Here is a tiny example: • a child flow that should correspond to your actual flow that you want to pass various parameters to
Copy code
from prefect import Flow, Parameter, task


@task(log_stdout=True)
def hello_world(user_input: str):
    print(f"hello {user_input}!")


with Flow("dummy-child-flow") as flow:
    param = Parameter("user_input", default="world")
    hw = hello_world(param)
• parent flow to trigger flow runs
Copy code
from prefect import Flow, unmapped
from prefect.tasks.prefect import create_flow_run
from prefect.executors import LocalDaskExecutor


with Flow("mapped_flows", executor=LocalDaskExecutor()) as flow:
    parameters = [dict(user_input="Prefect"),
                  dict(user_input="Marvin"),
                  dict(user_input="World"),
                  ]
    mapped_flows = create_flow_run.map(
        parameters=parameters,
        flow_name=unmapped("dummy-child-flow"),
        project_name=unmapped("community"),
    )
s
Oh that makes a lot of sense ! It makes dealing with multiple parameters way easier on the core engine. What about the
wait_for_flow_run
part ? I naively tried going for
wait_for_flow_run.map(mapped_flows, raise_final_state=True)
but it seems to fail instantly 😞
Also, just to make sure I understand correctly what's going on : the number of concurrent flows that will be ran concurrently depends entirely on the number of tasks the parent flow is able to run concurrently ? e.g. if I have a
DaskExecutor
with a Dask cluster composed of 6 nodes, I'll run 6 instances of the subflow at a time ?
a
the number of concurrent flows that will be ran concurrently depends entirely on the number of tasks the parent flow is able to run concurrently ?
The number of concurrent flows will be equal to the number of elements of a list you map over - here the list of parameters. Of course, if your instance is somehow overwhelmed, maybe it can’t handle it, but conceptually there are no restrictions here.
I tested it with wait_for_flow_run and it looks like it works just fine, as long as you put the raise kwarg as unmapped:
Copy code
from prefect import Flow, unmapped, task
from prefect.tasks.prefect import create_flow_run, wait_for_flow_run
from prefect.executors import LocalDaskExecutor
from prefect.triggers import all_finished


with Flow("mapped_flows", executor=LocalDaskExecutor()) as flow:
    parameters = [
        dict(user_input="Arthur"),
        dict(user_input="Marvin"),
        dict(user_input="Ford"),
    ]
    mapped_flow_ids = create_flow_run.map(
        parameters=parameters,
        flow_name=unmapped("dummy-child-flow"),
        project_name=unmapped("p"),
    )
    wait_for_flow_run.map(mapped_flow_ids, raise_final_state=unmapped(True))


if __name__ == "__main__":
    flow.register("p")
s
Right, I missed the unmapped part. It works perfectly well now, thanks ! Is there a way to restrict the number of concurrent flows that will be created by this kind of pattern ? In my actual use case, we will have up to ~200K elements in the parameters list and I'm afraid our DevOps team will hate me when I try to run this on our k8s cluster.
a
if you want to batch this somehow, perhaps you can queue those runs somehow? You could definitely create several “parent” flow runs that trigger several child flow runs, and those parent runs could be scheduled to run at a different time. Or perhaps you can temporarily scale out the cluster for such large workload and then scale in when you’re done?
in Prefect Cloud we have concurrency limits and this would make it easy to queue flow runs and batchify the process. One community user contributed this code snippet to limit flow run concurrency on Server - perhaps you can give it a try? https://github.com/PrefectHQ/server/issues/307
s
Thanks for the answers and snippet ! I'll check that out !
k
Does the parent flow need the Dask executor here? I am thinking if you have a LocalDaskExecutor with 4 threads, it would limit to 4 subflows being created and waited more. The hope is that it is executed depth-first because of the iterated mapping. Those subflows would then start on their own executor. There is currently some memory bloat with the DaskExecutor. You might encounter it when mapping over 200k elements. We have a fix around the corner for this.
upvote 1
s
That looks exactly like what I was aiming for : having a depth-first execution - including the sub-flow part - but limiting the amount of concurrent sub-flows running. I'll definitely try that out !
k
It’s not guaranteed though that it runs depth first for LocalDaskExecutor. The guarantees are stronger for DaskExecutor
s
What could impact the ability of the engine to run depth first ? I have found that having the branches be reduced at some point often meant going back to breadth first.
k
Yeah I think it’s scale of branches