Emma Rizzi
02/16/2022, 3:47 PMKevin Kho
create_flow_run()
doesn’t wait for it so you can just fire a lot of them consecutively. Or you do need to limit the concurrent runs?Emma Rizzi
02/16/2022, 4:07 PMwait_for_flow_run
would do the trick then! a simple loop with create_flow_run
i didnt think of that, thanks!Kevin Kho
@task
def add_one(x):
res = x + 1
print(res)
return res
@task
def add_two(x):
res = x + 2
print(res)
return res
with Flow("forloop") as flow:
inputs = [1, 2, 3, 4, 5]
tasks = []
for _in in inputs:
a = add_one(_in)
b = add_two(a)
tasks.append(a)
tasks.append(b)
# set dependencies
for i in range(1, len(tasks)):
tasks[i].set_upstream(tasks[i - 1])
flow.run()
create_flow_run
and wait_for_flow_run
callsEmma Rizzi
02/16/2022, 4:13 PM@task
def iterate_on_months(months_range):
logger = prefect.context.get("logger")
for month in months_range:
<http://logger.info|logger.info>("Launch flow for month: "+month)
flow_id = create_flow_run.run(
flow_name="Water Coverage Copernicus Land",
parameters={
"start_month":month
},
run_name="Subflow - "+month
)
wait_for_flow_run.run(flow_id)
In logs, I confirm that my loop iterated multiple times, and confirmed that news flows were launched according to the different dates printed, but the wait_for_flow
seem to wait only for the first one (date 202010) as the line Flow 'Subflow - 202010': Your flow run finished a few seconds ago
is printed each loop instead of waiting for the new one
When I checked the logs of the subflow, only one flow run appears (the 202010 flow run)
Do you have any insights on what is happening here ?
Also I had to add .run()
to create_flow_run and wait_for_flow_run to avoid an error during execution if this could be itAnna Geller
Emma Rizzi
02/17/2022, 2:27 PMAnna Geller
Emma Rizzi
02/17/2022, 2:40 PMAnna Geller
from prefect import task, Flow, Parameter, unmapped
from prefect.tasks.prefect import StartFlowRun
from prefect.engine.signals import ENDRUN
from prefect.engine.state import Failed
from typing import List
import time
@task(log_stdout=True)
def process_current_month(curr_month: str):
print(f"Processing {curr_month}...")
if curr_month == "202202":
raise ENDRUN(
Failed(
message=f"Month {curr_month} failed for some reason. Ending the task run"
)
)
time.sleep(2)
with Flow("Water Coverage Copernicus Land") as child_flow:
param = Parameter("start_month", default="202201")
process_current_month(param)
@task
def get_params(months_to_backfill: List[str]):
return [{"start_month": m} for m in months_to_backfill]
@task
def get_curom_run_name(months_to_backfill: List[str]):
return [f"Subflow - {m}" for m in months_to_backfill]
start_flow_run = StartFlowRun(project_name="community", wait=True)
with Flow("Water Coverage Copernicus Land - Backfill") as parent:
backfill_params = Parameter(
"months_to_backfill", default=["202201", "202202", "202203"]
)
child_flow_params = get_params(backfill_params)
child_flow_run_names = get_curom_run_name(backfill_params)
child_flow_runs = start_flow_run.map(
flow_name=unmapped("Water Coverage Copernicus Land"),
parameters=child_flow_params,
run_name=child_flow_run_names,
)
if __name__ == "__main__":
child_flow.register("community")
parent.register("community")
However, this implementation assumes that you want to carry on backfilling data for March even if February failed. If this is not what you want, LMK.
The image shows what I mean:Emma Rizzi
02/17/2022, 3:18 PMAnna Geller
with Flow("Water Coverage Copernicus Land - Backfill", executor=LocalExecutor()) as parent:
but note that the executor is retrieved from Storage so it's not stored in the backend and if you would change it in your storage e.g. to LocalDaskExecutor, then your child flow runs would be triggered in parallelEmma Rizzi
02/17/2022, 3:52 PM