https://prefect.io logo
#prefect-community
Title
# prefect-community
z

Zach Schumacher

05/01/2022, 3:26 PM
Is there an easy way to rerun all the flows in a project for a cloud backend?
a

Anna Geller

05/01/2022, 4:06 PM
Unfortunately, there isn't. The easiest way would be to write a GraphQL query to list all active flows in the project (taking only the most recent version of each flow), and then return the flow names as a list. Then you could start a flow run for each in parallel using mapping and create_flow_run. Here is an incomplete code snippet you can start with:
Copy code
from prefect.tasks.prefect import create_flow_run
from prefect import Flow, unmapped, task, Client


@task
def get_all_flows_for_project():
    query = """query {
              flow(where: { project: { name: {_eq: "your_project_name"} } }) {
                    id
                    name
                    version
                  }
                }"""
    client = Client()
    response = client.graphql(query)
    return response


with Flow("xxx") as flow:
    flows = get_all_flows_for_project()
    create_flow_run.map(flow_name=flows, project_name=unmapped("your_project_name"))
may I ask what happened that you need to rerun all flows in a project? did you have some sort of DB outage?
z

Zach Schumacher

05/01/2022, 4:36 PM
we had a k8s issue last night
👍 1
i settled on this
Copy code
import logging

from prefect import Client
from typing import TypedDict, List
from logging import getLogger


logger = getLogger(__name__)


class Flow(TypedDict):
    name: str
    id: str


def get_flows_by_project_name(c: Client, project_name: str) -> List[Flow]:
    flows = c.graphql(
        """
        query {
          flow (
              where: { 
              archived: { _eq: false },
              project: { name: { _eq: "%s" } } }
          ) 
          { name, id }
        }
        """ % project_name
    )["data"]["flow"]
    <http://logger.info|logger.info>(f"{len(flows)} flows found in {project_name}")
    return flows


def create_flow_run(c: Client, flow: Flow):
    <http://logger.info|logger.info>(f"Creating flow run for {flow}")
    c.create_flow_run(flow["id"])


def start_all_flows_in_project(token: str, project_name: str):
    c = Client(api_token=token)
    flows = get_flows_by_project_name(c, project_name)
    for flow in flows:
        create_flow_run(c, flow)
a

Anna Geller

05/01/2022, 5:05 PM
nice, thanks for sharing! 🙌 and sorry to hear you need to troubleshoot Kubernetes cluster issues over the weekend, hope you'll get it resolved quickly
12 Views