https://prefect.io logo
#prefect-community
Title
# prefect-community
c

Chu

07/25/2022, 2:54 PM
Hi, anyone knows how can I send params which contain mapped and unmapped parameters to create_flow_run? (More in the thread)
2
should I do this?
Copy code
params = dict(
            orgs=['org1','org2','org3], # should be mapping
            start_date="2022-03-19", # unmapped
            end_date="2022-03-20" #unmapped
        )

with Flow() as flow:
mapped_flow_ids = create_flow_run.map(
        flow_name=unmapped("map flow"),
        project_name=unmapped("myProject"),
        parmameters = params   # Am I doing it right?
    )
or it should be like this? just wondering if there is an option letting me only update orgs each time, and keep unmapped variable fixed
Copy code
params = dict(
            orgs=['org1','org2','org3], # should be mapping
            start_date=["2022-03-19","2022-03-19","2022-03-19"], 
            end_date=["2022-03-20","2022-03-20","2022-03-20"]
        )

with Flow() as flow:
mapped_flow_ids = create_flow_run.map(
        flow_name=unmapped("map flow"),
        project_name=unmapped("myProject"),
        parmameters = params   
    )
a

Anna Geller

07/25/2022, 3:31 PM
I'm marking this as resolved since this is exactly the same as https://prefect-community.slack.com/archives/CL09KU1K7/p1658691545223619
TL;DR: mapping required a list of inputs to iterate over, so your example would need to be sth more like this (assuming you want to map over a list of statically defined dictionaries):
Copy code
params = [dict(
            orgs=['org1','org2','org3], # should be mapping
            start_date="2022-03-19", # unmapped
            end_date="2022-03-20" #unmapped
        ),
dict(
            orgs=['org1','org2','org3], # should be mapping
            start_date="2022-03-19", # unmapped
            end_date="2022-03-20" #unmapped
        ),
dict(
            orgs=['org1','org2','org3], # should be mapping
            start_date="2022-03-19", # unmapped
            end_date="2022-03-20" #unmapped
        ),
...]

with Flow() as flow:
mapped_flow_ids = create_flow_run.map(
        flow_name=unmapped("map flow"),
        project_name=unmapped("myProject"),
        parmameters = params   # Am I doing it right?
    )
c

Chu

07/25/2022, 3:38 PM
but can I do this? I think they are quite similar
Copy code
params = dict(
            orgs=['org1','org2','org3], # should be mapping
            start_date=["2022-03-19","2022-03-19","2022-03-19"], 
            end_date=["2022-03-20","2022-03-20","2022-03-20"]
        )

with Flow() as flow:
mapped_flow_ids = create_flow_run.map(
        flow_name=unmapped("map flow"),
        project_name=unmapped("myProject"),
        parmameters = params   
    )
a

Anna Geller

07/25/2022, 3:39 PM
you have a dictionary and you need a list for mapping
params must be a list
c

Chu

07/25/2022, 3:43 PM
sorry I may not quite understand, if I did not set start_date and end_date as unmapped, why I cant do like above? these are just like orgs need to be mapping for my child flow parameters
a

Anna Geller

07/25/2022, 3:53 PM
I didn't even look what's inside - mapping works by iterating over a list, not dict. Currently you try to iterate your mapped task over a dict which won't work
you need to redesign it in a way that the input is a list
c

Chu

07/25/2022, 3:56 PM
in my above example, I think the flow will create three parallel flows each of which will take an org_id, a start_date and an end_date to run separately, I can give you details about “map flow”:
Copy code
@ task ('run dbt job')
def dbt_run(org_id, start_date, end_date):
  return DBTShellTask ()

with Flow("map flow") as flow:
  org_id = Parameter("organization_id")
  strat_date = Parameter("strat_date")
  end_date = Parameter("end_date")

  dbt_run(org_id, strat_date, end_date)
a

Anna Geller

07/25/2022, 4:04 PM
can you check the examples I shared and try them out? I think it provides all the info you need
1
mapping requires a list and parameters should have default values, unless you always provide those explicitly at runtime
c

Chu

07/25/2022, 4:37 PM
thanks, it is fixed!
I define a get_param function to help me organize params, do you think I can put the get_params() outside orchestrator? or inside? not sure if this action will impact flow registration, since sometimes we may add more orgs to that list:
Copy code
# we define those params outside and overwrite here
organization_ids=['id1','id2','id3']
data_processing_start_date = "2022-03-19 +0000"
data_processing_end_date = "2022-03-20 +0000"

# get params according to length of org_ids
def get_params():
    params = []
    for i in range(len(organization_ids)):
        org_dict = {}
        org_dict['organization_id'] = organization_ids[i]
        org_dict['data_processing_start_date'] = data_processing_start_date
        org_dict['data_processing_end_date'] = data_processing_end_date
        params.append(org_dict)

    return params

# retrieve mapping params
params = get_params()

# orchestrator
with Flow("orchestrator") as flow:
mapped_BI_model_run_ids = create_flow_run.map(
        flow_name=unmapped("xxx"),
        project_name=unmapped("xxx"),
        parmameters = params
    )

wait_for_BI_flow_run.map(flow_run_id=mapped_BI_model_run_ids, raise_final_state=unmapped(True), stream_logs=unmapped(True))
a

Anna Geller

07/26/2022, 12:47 PM
sometimes we may add more orgs to that list
in that case, I would store it in some data store (DB, S3, Redis, even Prefect KV Store if you use Cloud) and then retrieve it from there - this way, you don't have to reregister and redeploy flows when only some parameter values change glad you solved it!
and ofc I mean it in the way: you store this parametrization data in this KV Store and retrieve it in your flow in an extra task prior to this mapping - you can pass it as data dependency
c

Chu

07/26/2022, 2:57 PM
KV store sounds cool! I will look into that, and since we need to make those go live immediately, will my proposal (adding a python function return params) works? Since currently we have no plan for massive usage, parameters will be fixed. If it could work, should I put
params = get_params()
inside the Flow or outside the Flow from registeration perspective and best practice. Really thank you for answering my question, our team is really astonished by the functionaility of Prefect and how supportive the community is!!!
🙌 1
🙏 1
a

Anna Geller

07/26/2022, 3:24 PM
yes, if your parameters are relatively fixed, there is no harm in keeping them defined alongside your flow but keep in mind, if those change, you'll need to reregister your flow
1
c

Chu

07/26/2022, 6:14 PM
Thanks! Following this, I wanna check one syntx with you, can I set my upstream_tasks and task_args like this?
Copy code
mapped_BI_model_run_ids = create_flow_run.map(
        flow_name=unmapped("xxx"),
        project_name=unmapped("xxx"),
        upstream_tasks=unmapped([wait_for_another_mapped_flow]),
        task_args=unmapped(dict(trigger=all_successful)),
        parmameters = params
    )
Thanks for help!
a

Anna Geller

07/27/2022, 7:24 AM
Yup, nice work, this looks correct
c

Chu

07/27/2022, 11:53 AM
My confusion is here, wait_for_another_mapped_flow used mapping, we apply unmapped function outside it to set upstream_tasks, will that cause some issues?
a

Anna Geller

07/27/2022, 12:08 PM
give it a try
c

Chu

07/27/2022, 5:54 PM
Thanks! Is Prefect 1.0 support me to wrap some registered flows into a big task and passing this task to a parent flow do mapping like this?
Copy code
@task()
def giant_task(params1, params2):
    create_flow_run_...
    wait_for_flow_run_...
    create_flow_run_...(trigger=all_finished, parameters = params1)
    wait_for_flow_run_...
    create_flow_run_...(trigger=all_successful, parameters = params2)
    wait_for_flow_run_...

with Flow(...) as flows:
    output = giant_task.map(params1, params2)
I tried to run that, give s me an error
Copy code
Could not infer an active Flow context while creating edge to <Task: wait_for_flow_run>. This often means you called a task outside a `with Flow(...)` block. If you're trying to run this task outside of a Flow context, you need to call `wait_for_flow_run.run(...)`
I think when I add wait_for_flow_run.run(...) and create_flow_run.run(…), this could work
a

Anna Geller

07/27/2022, 7:05 PM
you can't run tasks from tasks
create_flow_run and wait_for_flow_run are tasks and can only run from a flow
if you need more examples on that, check https://discourse.prefect.io/tag/create_flow_run
c

Chu

07/27/2022, 7:14 PM
Thanks, what if what I want is something like that? https://prefect-community.slack.com/archives/CL09KU1K7/p1658855801534149
m

Mansour Zayer

07/27/2022, 7:45 PM
@Anna Geller The issue that we're addressing here is parallelization in a flow of flows. When we use mapping like this:
Copy code
with Flow(...) as flow:
   create_flow_run_1.map(input_list)
    wait_for_flow_run_1.map(input_list)
    create_flow_run_2.map(input_list)
    wait_for_flow_run_2.map(input_list)
    create_flow_run_3.map(input_list)
    wait_for_flow_run_3.map(input_list)
The DaskExecutor runs the first flow_run for n workers for the first n elements of the list, then runs the second flow_run for n workers etc. We want all flow_runs running the first element. That's why we're trying to wrap the create_flow_runs in a task. How do you propose we achieve this goal? (If my description isn't clear, plz let me know)
a

Anna Geller

07/28/2022, 12:37 AM
perhaps this is the type of syntax you're looking for?
Copy code
from prefect import Flow, task, unmapped
from prefect.tasks.prefect import create_flow_run
from prefect.executors import LocalDaskExecutor


@task
def generate_thousand_numbers(start, stop, step):
    nrs = range(start, stop, step)
    return list(nrs)


with Flow("mapped_flows", executor=LocalDaskExecutor()) as flow:
    flow_run_1 = generate_thousand_numbers(1, 1000, 1)
    flow_run_2 = generate_thousand_numbers(1000, 2000, 1)
    flow_run_3 = generate_thousand_numbers(2000, 3000, 1)
    flow_run_4 = generate_thousand_numbers(3000, 4000, 1)
    flow_run_5 = generate_thousand_numbers(4000, 5000, 1)
    # ... until 8
    parameters = [
        dict(list_of_numbers=flow_run_1),
        dict(list_of_numbers=flow_run_2),
        dict(list_of_numbers=flow_run_3),
        dict(list_of_numbers=flow_run_4),
        dict(list_of_numbers=flow_run_5),
        # ... until 8
    ]
    mapped_flows = create_flow_run.map(
        parameters=parameters,
        flow_name=unmapped("dummy-child-flow"),
        project_name=unmapped("community"),
    )
9 Views