https://prefect.io logo
c

Chris Jordan

01/05/2021, 3:15 PM
Is there a way to map
StartFlowRun
? I've got a task
Copy code
spawn_time_series_import = StartFlowRun(
        project_name="python_imports", 
        flow_name="blast_metric_series_flow")
which takes an argument
Copy code
spawn = spawn_time_series_import(
                parameters=dict(action_id=blast_id))
and a task that returns a list of `blast_id`s with which I want to spawn many flows
Copy code
blast_id_list = push_records_to_summary_table(imported)
I want to put all this together in something like this, but I'm having trouble finding the right syntax
Copy code
spawn_time_series_import = StartFlowRun(
        project_name="python_imports", 
        flow_name="blast_metric_series_flow")



with Flow("blast_import_flow",
        schedule=daily_schedule,
        state_handlers=[cloud_only_slack_handler]
        ) as flow:

    [[[stuff]]]
    blast_id_list = push_records_to_summary_table(imported)
    spawn = spawn_time_series_import.map(
                parameters=dict(action_id=blast_id_list))
is there an accepted syntax for this?
j

Jenny

01/05/2021, 4:12 PM
Hi @Chris Jordan - I think this explanation and example should answer your question: https://prefect-community.slack.com/archives/CL09KU1K7/p1607727047066800 Let us know if you've any other questions.
c

Chris Jordan

01/05/2021, 4:17 PM
Thanks for the response @Jenny. I'm trying to do something similar to this, but not quite the same. In this case I'm not interested in getting the results out of the flow run I started, as OP in that thread is. The challenge I'm struggling with is that I have a task result that contains a list of N items, and I want to feed that to N instances of
StartFlowRun
. If I had used a task instead of a flow for this, I could use map (and that is my fallback plan), but I would prefer to have each of these be their own flow run if possible.
z

Zanie

01/05/2021, 4:41 PM
It looks like your issue is that you’re trying to pass the mapped argument in a dict which as a simple example looks like — this won’t work. I’ll create a passing example in a second.
Copy code
import prefect
from prefect import task, Flow


tasks = [i for i in range(3)]


@task
def display(params):
    <http://prefect.context.logger.info|prefect.context.logger.info>(f"{params['i']}")


with Flow("mapped") as flow:
    display.map(params=dict(i=tasks))

flow.run()
👀 1
Copy code
import prefect
from prefect import task, Flow


tasks = [i for i in range(3)]


@task
def display(params):
    <http://prefect.context.logger.info|prefect.context.logger.info>(f"{params['i']}")


@task
def package_into_params(i):
    return dict(i=i)


with Flow("mapped") as flow:
    packaged_values = package_into_params.map(tasks)
    display.map(params=packaged_values)

flow.run()
c

Chris Jordan

01/05/2021, 4:45 PM
I'll see what I can do with that - thank you @Zanie
That seems to be doing it - thanks a bunch!
👍 1
9 Views