Yohann
05/19/2021, 7:07 AMfrom prefect import Flow, task
from prefect.tasks.prefect import StartFlowRun
from prefect.schedules import Schedule
from prefect.schedules.clocks import CronClock
from uuid import uuid4
@task
def run_subflow(item):
StartFlowRun(flow_name="my_sub_flow", project_name="test", wait=True, parameters={"item":item}).run(idempotency_key=str(uuid4()))
with Flow("my_flow", schedule=Schedule(clocks=[CronClock("0 0 * * *")]),) as flow:
run_subflow.map(item=["a", "b", "c"])
flow.run()
Kevin Kho
Yohann
05/19/2021, 3:49 PMKevin Kho
Kevin Kho
Kevin Kho
Yohann
05/19/2021, 5:13 PMYohann
05/19/2021, 7:47 PMquery {
flow_run(where: { end_time: { _lt: "2021-01-01 12:00:00" } }, order_by:{ end_time: asc },) {
id
end_time
}
}
Then you need to call mutation to delete old flows.
mutation {
delete_flow_run(input: {flow_run_id: "{{uuid}}"}){success
error}
}