Naga Sravika Bodapati

    Naga Sravika Bodapati

    5 months ago
    1. Hi all, we have been using Prefect for scheduling and monitoring flows as part of our organization work. We have a set of questions and issues we need answers for. Please address them if possible. We have a Docker Agent managing flows built with Docker Run as run config and Local Storage but the agent seems to stop picking and running flows after a few days - typically 6-7 days and even if it is actively polling, the flows are not run. Is this issue seem before and if so ,what is the solution? We start the docker agent with nohup prefect agent docker start --token <auth_token> -l <agent_label> 2. We have a flow which tries to establish a pyodbc connection to ms sql server with connection timeout of 60 seconds and have the task configured with max_reties and retry delay. max_retries=2, retry_delay=timedelta(minutes=1) We expect the task to finish running in under 10mins but they are running off into over 15-16hours. Why is this the case? This is also causing cannot allocate memory issue. 3. We have seen some scenarios like a prefect agent is down suddenly and this might lead to flows failing continuously. How can cases like this be avoided in future? For example : There are some scenarios where an agent is going down, at this time all the scheduled flows pile up and are tagged under Late runs. After few hours when the agent is up again, all the late run flows starts to run and this leads to a memory issue in the machine. 4. Is there a way in prefect, to kill a flow if its is running for more than 'x' amount of time. 5. Is there way in prefect to stop a pipeline schedule based on condition (eg: if the last three consecutive flow fails then stop the schedule).
    Anna Geller

    Anna Geller

    5 months ago
    #1 Did you happen to upgrade your flow or agent? I see your command is still using --token, which is now deprecated in favor of the --key. You may try to upgrade your agent and flows, and create a new API key, to check if that helps. #2 It's hard to say what may be the root cause of the timeout issue without seeing the flow code. Some users experienced that when the DB connection was not closed. Can you share your flow code? #3 Many agents can be started as a service that can automatically be restarted when something goes wrong, e.g. KubernetesAgent or ECSAgent. For a Docker agent, this may be more challenging, but you could start by implementing agent-level SLA failure alerts via Automation. Btw this is handled better in Prefect 2.0 through separation between agents and work queues. #4 Yes, this can be configured using Flow SLA Automation #5 Here is an example that you can use to pause the schedule if the last flow run failed - you would need to build a query similar to this example state handler but limit to 3 runs and as action use set_schedule_inactive as in the example from the Gist:
    from datetime import timedelta
    from prefect import task, Flow
    from prefect.schedules.clocks import IntervalClock
    from prefect.schedules import Schedule
    import pendulum
    import prefect
    from prefect.client import Client
    from prefect.engine.state import Failed
    import time
    
    
    schedule = Schedule(clocks=[IntervalClock(timedelta(minutes=5))])
    
    
    def fail_if_last_flow_run_failed(obj, old_state, new_state):
        if new_state.is_running():
            client = Client()
            query = """
                query ($flow_id: uuid, $state_timestamp: timestamptz) {
                  flow_run(
                    where: {_and: [{flow_id: {_eq: $flow_id}}, {state_timestamp: {_lt: $state_timestamp}}, {state: {_neq: "Scheduled"}}]}
                    order_by: {state_timestamp: desc}
                    limit: 1
                  ) {
                    name
                    state
                    end_time
                    state_timestamp
                  }
                }
            """
            response = client.graphql(
                query=query,
                variables=dict(
                    flow_id=prefect.context.flow_id,
                    state_timestamp=pendulum.now(tz="UTC").isoformat(),
                ),
            )
            last_flow_run = response["data"]["flow_run"][0]
    
            if last_flow_run["state"] == "Failed":
                logger = prefect.context.get("logger")
                message = "Failing this flow run since the last flow run ended in Failure"
                <http://logger.info|logger.info>(message)
                return Failed(message)
        return new_state
    
    
    @task(log_stdout=True)
    def hello_world():
        print("hello world")
    
    
    with Flow("hello", schedule=schedule, state_handlers=[fail_if_last_flow_run_failed]) as flow:
        hw = hello_world()
    btw it's quite hard to answer when you ask about 5 different issues within one thread. Next time, it's much easier if you ask each question in a separate thread because we can keep the conversation about each issue separately
    Naga Sravika Bodapati

    Naga Sravika Bodapati

    5 months ago
    Sure Anna, Thank you! I will share the flow code for point #2