Zach Schumacher

    Zach Schumacher

    4 months ago
    Is there an easy way to rerun all the flows in a project for a cloud backend?
    Anna Geller

    Anna Geller

    4 months ago
    Unfortunately, there isn't. The easiest way would be to write a GraphQL query to list all active flows in the project (taking only the most recent version of each flow), and then return the flow names as a list. Then you could start a flow run for each in parallel using mapping and create_flow_run. Here is an incomplete code snippet you can start with:
    from prefect.tasks.prefect import create_flow_run
    from prefect import Flow, unmapped, task, Client
    
    
    @task
    def get_all_flows_for_project():
        query = """query {
                  flow(where: { project: { name: {_eq: "your_project_name"} } }) {
                        id
                        name
                        version
                      }
                    }"""
        client = Client()
        response = client.graphql(query)
        return response
    
    
    with Flow("xxx") as flow:
        flows = get_all_flows_for_project()
        create_flow_run.map(flow_name=flows, project_name=unmapped("your_project_name"))
    may I ask what happened that you need to rerun all flows in a project? did you have some sort of DB outage?
    Zach Schumacher

    Zach Schumacher

    4 months ago
    we had a k8s issue last night
    i settled on this
    import logging
    
    from prefect import Client
    from typing import TypedDict, List
    from logging import getLogger
    
    
    logger = getLogger(__name__)
    
    
    class Flow(TypedDict):
        name: str
        id: str
    
    
    def get_flows_by_project_name(c: Client, project_name: str) -> List[Flow]:
        flows = c.graphql(
            """
            query {
              flow (
                  where: { 
                  archived: { _eq: false },
                  project: { name: { _eq: "%s" } } }
              ) 
              { name, id }
            }
            """ % project_name
        )["data"]["flow"]
        <http://logger.info|logger.info>(f"{len(flows)} flows found in {project_name}")
        return flows
    
    
    def create_flow_run(c: Client, flow: Flow):
        <http://logger.info|logger.info>(f"Creating flow run for {flow}")
        c.create_flow_run(flow["id"])
    
    
    def start_all_flows_in_project(token: str, project_name: str):
        c = Client(api_token=token)
        flows = get_flows_by_project_name(c, project_name)
        for flow in flows:
            create_flow_run(c, flow)
    Anna Geller

    Anna Geller

    4 months ago
    nice, thanks for sharing! 🙌 and sorry to hear you need to troubleshoot Kubernetes cluster issues over the weekend, hope you'll get it resolved quickly