Thread
#prefect-community
    Yohann

    Yohann

    1 year ago
    Good morning everyone 🙂 I need some help with prefect. I'm trying to set up a use case where a flow runs subflow. But I got strange behaviors and I don't know why prefect works like this. I put a small example below. My goal is to launch every night a flow that iterates over a list and for each element runs a new flow. Each subflow is configured with the value of this list (it's an url). At the beggining, I was calling StartFlowRun without the idempotency_key. And subflows were called only for the first item. After this, I have used the idempotency_key configured with the item value. I thought It was working but I got strange bevaviors while scheduling my_flow. The flow has worked the first night, but for the others days subflows were not called (like if they were stuck). Then I put a random key for the idempotency_key. And this time everything works correctly for me. Is this normal ? I don't know if it is a good idea to use prefect like this because the GUI will be flooded with new flows. Does prefect rotate the postgres database to remove old flows ?
    from prefect import Flow, task
    from prefect.tasks.prefect import StartFlowRun
    from prefect.schedules import Schedule
    from prefect.schedules.clocks import CronClock
    from uuid import uuid4
    
    @task
    def run_subflow(item):
            StartFlowRun(flow_name="my_sub_flow", project_name="test", wait=True, parameters={"item":item}).run(idempotency_key=str(uuid4()))
        
    
    with Flow("my_flow", schedule=Schedule(clocks=[CronClock("0 0 * * *")]),) as flow:
        run_subflow.map(item=["a", "b", "c"])
    flow.run()
    Kevin Kho

    Kevin Kho

    1 year ago
    Hi @Yohann, the idempotency_key is good for 24 hours and it won’t start a new flow run if even that idempotency_key. Was the next time you called it within 24 hours? For the last question, are you on cloud or server?
    Yohann

    Yohann

    1 year ago
    This is a good question. In my example, my_flow is scheduled every 24 hours. For each sub flow, it takes a few seconds to execute. For instance, for a sub flow called sub_flow with parameter(item:a) there is approximately 24 hours. But I need to run again this sub flow with this parameter every 24 hours. What is the purpose of this 24 hours limit, I don't understand this concept and how it applies to a flow ? Does it apply to the flow name, or the flow + signature of the parameters ? I use the server version.
    Kevin Kho

    Kevin Kho

    1 year ago
    The general concept is that if you have an unstable internet connection and want to prevent duplicate runs of the same flow, the idempotency key is the marker that it’s the same flow you want to run. Each idempotent request refreshes the cache for 24 more hours.
    Given they are indeed different runs, I think you want different idempotent keys so I think you’re all good. It applies to the flow name, but you can pass a key with literally any unique string that identifies it as a unique run.
    We don’t swap out the database. Cloud will handle the scaling for you though. Do you know you get 10000 free task runs every month on Cloud?
    Yohann

    Yohann

    1 year ago
    Ok I understand. Thank you @Kevin Kho for all these tips. I didn't know about these free tasks. I will take a look 👍
    For those who are interested in cleaning old flows from the postgres database. I found a way to do the job. You need to call the graphql api with a query like this. It will get all flows ended before a date :
    query {
    flow_run(where: { end_time: { _lt: "2021-01-01 12:00:00" } }, order_by:{ end_time: asc },) {
    id
    end_time
    }
    }
    Then you need to call mutation to delete old flows.
    mutation {
    delete_flow_run(input: {flow_run_id: "{{uuid}}"}){success
    error}
    }