Ben Muller
12/05/2022, 12:19 AMwait_for_flow_run
so that I can run sub flows concurrently without introducing all the async stuff I was trying to avoid. I ended up using the rest api. Would there have been an easier way to go about this? seems to do the job β
import time
import requests
from prefect import get_run_logger, task
from prefect.deployments import run_deployment
from aws_helpers import Ssm
@task
def wait_for_deployment(flow_run_id: str, log_polling_interval: int = 25, max_run_time: int = 1000):
logger = get_run_logger()
api_key = Ssm.get_secret("/prefect/api_key")
api_url = Ssm.get_secret("/prefect/api_url")
while True:
flow_run = request_prefect_api(path=f"/flow_runs/{flow_run_id}", api_key=api_key, api_url=api_url)
<http://logger.info|logger.info>(f"{flow_run['name']} : {flow_run['state_type']}")
state = flow_run["state_type"]
if state == "COMPLETED":
break
if state == "FAILED":
raise Exception(f"Deployment: {flow_run['name']}, Flow run failed: {flow_run['state']['message']}")
if flow_run["total_run_time"] > max_run_time:
raise Exception(f"Flow run exceeded max run time of {max_run_time} seconds: marking as failed")
time.sleep(log_polling_interval)
def request_prefect_api(path: str, api_key: str, api_url: str):
url = f"{api_url}{path}"
headers = {"Authorization": f"Bearer {api_key}", "Content-Type": "application/json"}
return requests.get(url, headers=headers).json()
Anna Geller
12/05/2022, 8:22 AMBen Muller
12/05/2022, 8:30 AMAnna Geller
12/05/2022, 8:34 AMBen Muller
12/05/2022, 8:47 AMAnna Geller
12/05/2022, 10:10 AMBen Muller
12/05/2022, 10:13 AMfrom prefect import flow
from prefect.deployments import run_deployment
@flow
def my_parent_flow():
table_a = run_deployment( # I take 3 minutes
name="table-a/default",
flow_run_name="my-parent-flow-trigger/table-a",
)
table_b = run_deployment( # I take 2 minutes
name="table-b/default",
flow_run_name="my-parent-flow-trigger/table-b",
)
# waited 5 minutes before starting
my_task_that_reads_from_table_a_and_table_b(wait_for=[table_a, table_b])
But with the task I created ( wait_fo_deployment
) I can now do:
from prefect import flow
from prefect.deployments import run_deployment
from es_common.prefect_tasks import wait_for_deployment
@flow
def my_parent_flow():
table_a = wait_for_deployment.submit(
run_deployment( # I take 3 minutes
name="table-a/default",
flow_run_name="my-parent-flow-trigger/table-a",
timeout=0
).id
)
table_b = wait_for_deployment.submit(
run_deployment( # I take 2 minutes
name="table-b/default",
flow_run_name="my-parent-flow-trigger/table-b",
timeout=0
).id
)
# waited 3 minutes before starting ensuring that table_a and table_b ran succesfully
my_task_that_reads_from_table_a_and_table_b(wait_for=[table_a, table_b])
Anna Geller
12/06/2022, 12:41 AMBen Muller
12/06/2022, 12:43 AMAnna Geller
12/06/2022, 9:10 AMBen Muller
12/06/2022, 9:14 AMYaron Levi
12/06/2022, 2:32 PMBen Muller
12/06/2022, 6:32 PMAnna Geller
12/09/2022, 2:42 AM