https://prefect.io logo
e

Emma Rizzi

02/16/2022, 3:47 PM
Hi! I've been reading about tasks mapping, is there an equivalent to flow mapping ? Also, when mapping a task, are all tasks launched in parallel ? I would need to control the order of execution
k

Kevin Kho

02/16/2022, 3:51 PM
Using the
create_flow_run()
doesn’t wait for it so you can just fire a lot of them consecutively. Or you do need to limit the concurrent runs?
e

Emma Rizzi

02/16/2022, 4:07 PM
i need to have one flow at a time due to data access, from what i read i beleive
wait_for_flow_run
would do the trick then! a simple loop with
create_flow_run
i didnt think of that, thanks!
k

Kevin Kho

02/16/2022, 4:11 PM
Yes you can loop in the Flow block like this:
Copy code
@task
def add_one(x):
    res = x + 1
    print(res)
    return res


@task
def add_two(x):
    res = x + 2
    print(res)
    return res


with Flow("forloop") as flow:
    inputs = [1, 2, 3, 4, 5]
    tasks = []
    for _in in inputs:
        a = add_one(_in)
        b = add_two(a)
        tasks.append(a)
        tasks.append(b)

    # set dependencies
    for i in range(1, len(tasks)):
        tasks[i].set_upstream(tasks[i - 1])

flow.run()
🦜 1
So in your case just loop over
create_flow_run
and
wait_for_flow_run
calls
e

Emma Rizzi

02/16/2022, 4:13 PM
thanks a lot fellow groot fanatic! This should be perfect 🙂
😂 1
👍 1
@Kevin Kho ugh I'm struggling a bit if you can have a look at this... I did this loop:
Copy code
@task
def iterate_on_months(months_range):
    logger = prefect.context.get("logger")
    for month in months_range:
        <http://logger.info|logger.info>("Launch flow for month: "+month)
        flow_id = create_flow_run.run(
            flow_name="Water Coverage Copernicus Land",
            parameters={
                "start_month":month
            },
            run_name="Subflow - "+month
        )
        wait_for_flow_run.run(flow_id)
In logs, I confirm that my loop iterated multiple times, and confirmed that news flows were launched according to the different dates printed, but the
wait_for_flow
seem to wait only for the first one (date 202010) as the line
Flow 'Subflow - 202010': Your flow run finished a few seconds ago
is printed each loop instead of waiting for the new one When I checked the logs of the subflow, only one flow run appears (the 202010 flow run) Do you have any insights on what is happening here ? Also I had to add
.run()
to create_flow_run and wait_for_flow_run to avoid an error during execution if this could be it
Heres a few logs from the loop, each sub flow seems to have the same flow_id
a

Anna Geller

02/17/2022, 2:14 PM
Can you share the full flow showing what you try to do? Or can you describe or draw how would you like the flow(s) to be executed? I'm sure we can find a more elegant way than calling a task in another task
@Emma Rizzi you can check out this thread - it discusses a very similar issue. Kevin is out today but if this thread doesn't help you, LMK and we can certainly find a solution together
e

Emma Rizzi

02/17/2022, 2:27 PM
@Anna Geller Thanks for the follow up! I have a flow that downloads data, do some processing and uploads it This flow is meant to be schedule every month to gather new data I need to initialize my repo with all the historic data until now before the first schedule, and tried to call my flow multiple times in a new flow so I can use it again in case data get corrupted or anything goes wrong (and avoid duplicated code) Problem is to handle data writing that needs to occur in chronological order, so mapping would create a few concurrency problems, hence the wait_for_flow_run I called the task inside a task, maybe I should put the loop directly inside the flow ?
a

Anna Geller

02/17/2022, 2:32 PM
I see, so you need to call the flow first for January, and only if this is successful then call it for February and so on?
e

Emma Rizzi

02/17/2022, 2:40 PM
yes exactly!
a

Anna Geller

02/17/2022, 3:09 PM
I have this flow so far - when you use the default Local executor rather than Dask executor, the child flow runs will be triggered sequentially.
Copy code
from prefect import task, Flow, Parameter, unmapped
from prefect.tasks.prefect import StartFlowRun
from prefect.engine.signals import ENDRUN
from prefect.engine.state import Failed
from typing import List
import time


@task(log_stdout=True)
def process_current_month(curr_month: str):
    print(f"Processing {curr_month}...")
    if curr_month == "202202":
        raise ENDRUN(
            Failed(
                message=f"Month {curr_month} failed for some reason. Ending the task run"
            )
        )
    time.sleep(2)


with Flow("Water Coverage Copernicus Land") as child_flow:
    param = Parameter("start_month", default="202201")
    process_current_month(param)


@task
def get_params(months_to_backfill: List[str]):
    return [{"start_month": m} for m in months_to_backfill]


@task
def get_curom_run_name(months_to_backfill: List[str]):
    return [f"Subflow - {m}" for m in months_to_backfill]


start_flow_run = StartFlowRun(project_name="community", wait=True)


with Flow("Water Coverage Copernicus Land - Backfill") as parent:
    backfill_params = Parameter(
        "months_to_backfill", default=["202201", "202202", "202203"]
    )
    child_flow_params = get_params(backfill_params)
    child_flow_run_names = get_curom_run_name(backfill_params)
    child_flow_runs = start_flow_run.map(
        flow_name=unmapped("Water Coverage Copernicus Land"),
        parameters=child_flow_params,
        run_name=child_flow_run_names,
    )
if __name__ == "__main__":
    child_flow.register("community")
    parent.register("community")
However, this implementation assumes that you want to carry on backfilling data for March even if February failed. If this is not what you want, LMK. The image shows what I mean:
With this implementation, you run all child flows sequentially, but when e.g. February failed, you can backfill it separately in a new run. Would this work for your use case?
e

Emma Rizzi

02/17/2022, 3:18 PM
Thanks a lot ! I'm trying this I hope it will work For now interrupting the sequence on a fail is not necessary as its meant to be run very scarcely on data initialization we can fix it manually I'll keep you updated!
However, I use a DockerRun as executor, will the subflows still be triggered sequentially ? (on a single VM instance)
a

Anna Geller

02/17/2022, 3:45 PM
Yes, definitely! The executor decides on parallelism, therefore this flow should work the same way regardless of your run config. You could even define your executor explicitly:
Copy code
with Flow("Water Coverage Copernicus Land - Backfill", executor=LocalExecutor()) as parent:
but note that the executor is retrieved from Storage so it's not stored in the backend and if you would change it in your storage e.g. to LocalDaskExecutor, then your child flow runs would be triggered in parallel
e

Emma Rizzi

02/17/2022, 3:52 PM
Thanks for clarifying!
3 Views