Zach Schumacher
05/01/2022, 3:26 PMAnna Geller
from prefect.tasks.prefect import create_flow_run
from prefect import Flow, unmapped, task, Client
@task
def get_all_flows_for_project():
query = """query {
flow(where: { project: { name: {_eq: "your_project_name"} } }) {
id
name
version
}
}"""
client = Client()
response = client.graphql(query)
return response
with Flow("xxx") as flow:
flows = get_all_flows_for_project()
create_flow_run.map(flow_name=flows, project_name=unmapped("your_project_name"))
Zach Schumacher
05/01/2022, 4:36 PMimport logging
from prefect import Client
from typing import TypedDict, List
from logging import getLogger
logger = getLogger(__name__)
class Flow(TypedDict):
name: str
id: str
def get_flows_by_project_name(c: Client, project_name: str) -> List[Flow]:
flows = c.graphql(
"""
query {
flow (
where: {
archived: { _eq: false },
project: { name: { _eq: "%s" } } }
)
{ name, id }
}
""" % project_name
)["data"]["flow"]
<http://logger.info|logger.info>(f"{len(flows)} flows found in {project_name}")
return flows
def create_flow_run(c: Client, flow: Flow):
<http://logger.info|logger.info>(f"Creating flow run for {flow}")
c.create_flow_run(flow["id"])
def start_all_flows_in_project(token: str, project_name: str):
c = Client(api_token=token)
flows = get_flows_by_project_name(c, project_name)
for flow in flows:
create_flow_run(c, flow)
Anna Geller