I have a question about Prefect Cloud and FlowRunn...
# ask-community
t
I have a question about Prefect Cloud and FlowRunners. It looks like, if I want to loop a flow based on state, I need to run it through a FlowRunner. How would I do that through Prefect Cloud?
j
Hi Tim, are you asking about calling
flow.run
in a loop? Or looping within a single task through the
LOOP
signal?
t
Basically running a flow in batches. We find some performance issues with very large datasets in our implementation, so I want to limit the number of records I pull and process 100%. So at the end of the flow, if I have more records to process I want to star the flow over.
It looks like a FlowRunner can do that, but I am unsure how to get it scheduled/managed through the UI
j
There's a few ways you might handle this within prefect using other mechanisms that might be cleaner: • If your data can be efficiently partitioned and loaded in batches, I'd handle this through task mapping, where each mapped pipeline handles a subset of the data. For example, if you're processing files in a directory, you might have an initial task that outputs a list of paths, and then map over those paths to process each one individually. • If you want to keep your flow processing a subset of the data, you could make use of a 2nd flow to orchestrate multiple running flows using the
StartFlowRun
task (see https://docs.prefect.io/core/idioms/flow-to-flow.html#scheduling-a-flow-of-flows for an example of this). • If you can't predetermine how many executions of your processing flow you'll need before execution, you might have your flow end with a call to
StartFlowRun
to schedule another execution.
t
Will calling
StaratFlowRun
not introduce a recursive stack of flows? Or will it give me a clean flow?
j
If you don't set
wait=True
, it will only schedule the new flow run and return immediately.
t
perfect! thank you!
I am running this, but it doesn't seem to be actually running the flow locally.
Copy code
@task(name='replay')
def replay(limit, num_pages, myflow=None):
    if not limit:
        return

    if limit < num_pages:
        StartFlowRun(flow_name=myflow)
    else:
        return
j
Sorry,
StartFlowRun
is a task itself, you'll want to add it as a dependent task of your flow.
t
ok, so how would I trigger it based on that evaluation?
j
You might also use
prefect.Client.create_flow_run
directly yourself if you want to avoid creating an extra task or want to make larger programmatic decisions within a single task.
Oh wait, you could also just add a
.run()
call to the end of the task you have there and things should work fine.
t
ok
last thing... is there a way to get the project name from the context of the flow run?
j
Yep, was added in 0.14.11.
prefect.context["project_name"]
t
ah, it looks like I have 14.6 locally. thank you!
upvote 1
shoot, how do i set a project locally?
j
Meaning for
flow.run()
execution?
t
yeah
j
StartFlowRun
won't work for local runs, this is a cloud/server only feature.
t
ah, ok
I will test it out on the cloud then
thank you so much for the help
j
No problem, please let us know if you run into any other issues.