Zach Schumacher

05/01/2022, 3:26 PM
Is there an easy way to rerun all the flows in a project for a cloud backend?

Anna Geller

05/01/2022, 4:06 PM
Unfortunately, there isn't. The easiest way would be to write a GraphQL query to list all active flows in the project (taking only the most recent version of each flow), and then return the flow names as a list. Then you could start a flow run for each in parallel using mapping and create_flow_run. Here is an incomplete code snippet you can start with:
Copy code
from prefect.tasks.prefect import create_flow_run
from prefect import Flow, unmapped, task, Client

def get_all_flows_for_project():
    query = """query {
              flow(where: { project: { name: {_eq: "your_project_name"} } }) {
    client = Client()
    response = client.graphql(query)
    return response

with Flow("xxx") as flow:
    flows = get_all_flows_for_project(), project_name=unmapped("your_project_name"))
may I ask what happened that you need to rerun all flows in a project? did you have some sort of DB outage?

Zach Schumacher

05/01/2022, 4:36 PM
we had a k8s issue last night
👍 1
i settled on this
Copy code
import logging

from prefect import Client
from typing import TypedDict, List
from logging import getLogger

logger = getLogger(__name__)

class Flow(TypedDict):
    name: str
    id: str

def get_flows_by_project_name(c: Client, project_name: str) -> List[Flow]:
    flows = c.graphql(
        query {
          flow (
              where: { 
              archived: { _eq: false },
              project: { name: { _eq: "%s" } } }
          { name, id }
        """ % project_name
    <|>(f"{len(flows)} flows found in {project_name}")
    return flows

def create_flow_run(c: Client, flow: Flow):
    <|>(f"Creating flow run for {flow}")

def start_all_flows_in_project(token: str, project_name: str):
    c = Client(api_token=token)
    flows = get_flows_by_project_name(c, project_name)
    for flow in flows:
        create_flow_run(c, flow)

Anna Geller

05/01/2022, 5:05 PM
nice, thanks for sharing! 🙌 and sorry to hear you need to troubleshoot Kubernetes cluster issues over the weekend, hope you'll get it resolved quickly