https://prefect.io logo
Title
b

Ben Muller

12/05/2022, 12:19 AM
I wrote my hacked together version of prefect 1's
wait_for_flow_run
so that I can run sub flows concurrently without introducing all the async stuff I was trying to avoid. I ended up using the rest api. Would there have been an easier way to go about this? seems to do the job βœ…
import time

import requests
from prefect import get_run_logger, task
from prefect.deployments import run_deployment

from aws_helpers import Ssm

@task
def wait_for_deployment(flow_run_id: str, log_polling_interval: int = 25, max_run_time: int = 1000):

    logger = get_run_logger()

    api_key = Ssm.get_secret("/prefect/api_key")
    api_url = Ssm.get_secret("/prefect/api_url")

    while True:

        flow_run = request_prefect_api(path=f"/flow_runs/{flow_run_id}", api_key=api_key, api_url=api_url)
        <http://logger.info|logger.info>(f"{flow_run['name']} : {flow_run['state_type']}")

        state = flow_run["state_type"]

        if state == "COMPLETED":
            break
        if state == "FAILED":
            raise Exception(f"Deployment: {flow_run['name']}, Flow run failed: {flow_run['state']['message']}")
        if flow_run["total_run_time"] > max_run_time:
            raise Exception(f"Flow run exceeded max run time of {max_run_time} seconds: marking as failed")

        time.sleep(log_polling_interval)


def request_prefect_api(path: str, api_key: str, api_url: str):
    url = f"{api_url}{path}"
    headers = {"Authorization": f"Bearer {api_key}", "Content-Type": "application/json"}
    return requests.get(url, headers=headers).json()
πŸ‘€ 1
πŸ™Œ 5
a

Anna Geller

12/05/2022, 8:22 AM
thanks for sharing! have you tried the run_deployment utility? I think you wanted to do it for ECS and IIRC I wrote this for you back then https://discourse.prefect.io/t/how-to-run-deployments-with-ecstask-infrastructure-block[…]f-serverless-ecs-containerized-flow-runs-concurrently/1929
b

Ben Muller

12/05/2022, 8:30 AM
Hey Anna, this is a bit of a different use case. I'm not scaling 100's of flows, but more just running children flow from a main flow. Just nice to have more fine grained logging.
a

Anna Geller

12/05/2022, 8:34 AM
gotcha, you could also turn on DEBUG logs, but if there is something specific we could add to run_deployment to make it more useful to you, we would be open to a feature request, is there something specific that was missing there? specific logs?
we have open feature request for concurrent subflows, but here are concurrent deployments and run_deployment was introduced for that purpose (less code you need to maintain) so I wonder how we could extend it to help you here
b

Ben Muller

12/05/2022, 8:47 AM
Yeah, it was more concurrent deployments, but different deployments. So I want to trigger like 10 deployments concurrently and then when they finish I will do some other things with the data generated from those deployments. In 1.0 I did that with run_flow and wait_for_flow_run but that doesn't seem to be achievable with run_deployment because you either have to wait for it to complete and block execution (if you don't want to use async) or set timeout to zero and not know the final state / when it's complete. Does that make sense or am I missing something?
So to answer your question, I guess the limitation is that I can't kick off multiple deployments concurrently and get the end states from the same flow
a

Anna Geller

12/05/2022, 10:10 AM
thanks, to get the end state, you need to poll for the state - do you mean that you wish for the polling for a state to be decoupled from triggering? if so, why is this important? I've heard a lot of complaints in v1 that having those separated makes things more difficult, and this single function
if you would like to contribute that different more decoupled run_deployment pattern, we are definitely open to it - feel free to open a feature request or submit a PR πŸ™Œ thanks again for sharing
b

Ben Muller

12/05/2022, 10:13 AM
I mean that I want to poll for the state but not block execution of the rest of the flow. I just updated the code, so I guess it makes a bit more sense.
I might not be making complete sense, so just to make sure I'll send you a code example of what I'm saying. (tomorrow).
πŸ™Œ 1
:gratitude-thank-you: 1
Ok @Anna Geller - here I go. Currently I want to do something like this:
from prefect import flow
from prefect.deployments import run_deployment


@flow
def my_parent_flow():

    table_a = run_deployment(  # I take 3 minutes
        name="table-a/default",
        flow_run_name="my-parent-flow-trigger/table-a",
    )

    table_b = run_deployment(  # I take 2 minutes
        name="table-b/default",
        flow_run_name="my-parent-flow-trigger/table-b",
    )

    # waited 5 minutes before starting
    my_task_that_reads_from_table_a_and_table_b(wait_for=[table_a, table_b])
But with the task I created (
wait_fo_deployment
) I can now do:
from prefect import flow
from prefect.deployments import run_deployment
from es_common.prefect_tasks import wait_for_deployment


@flow
def my_parent_flow():

    table_a = wait_for_deployment.submit(
        run_deployment(  # I take 3 minutes
            name="table-a/default",
            flow_run_name="my-parent-flow-trigger/table-a",
            timeout=0
        ).id
    )

    table_b = wait_for_deployment.submit(
        run_deployment(  # I take 2 minutes
            name="table-b/default",
            flow_run_name="my-parent-flow-trigger/table-b",
            timeout=0
        ).id
    )

    # waited 3 minutes before starting ensuring that table_a and table_b ran succesfully
    my_task_that_reads_from_table_a_and_table_b(wait_for=[table_a, table_b])
a

Anna Geller

12/06/2022, 12:41 AM
Thanks for sharing Ben πŸ™Œ
πŸ™Œ 1
βœ… 1
b

Ben Muller

12/06/2022, 12:43 AM
does that now make sense?
βœ… 1
a

Anna Geller

12/06/2022, 9:10 AM
it does, it looks like the main advantage you gain is lineage + concurrently running A and B deployment run, right? run_deployment is by default blocking until completion, so my_task_that_reads_from_table_a_and_table_b wouldn't start anyway before previous deployment runs are finished
b

Ben Muller

12/06/2022, 9:14 AM
thats it 🎯
πŸ™Œ 1
y

Yaron Levi

12/06/2022, 2:32 PM
This might be super useful for us (and others) in the future. Maybe it’s better to put it also in the Discourse community (and also many others threads here πŸ™‚)?
b

Ben Muller

12/06/2022, 6:32 PM
Point me to the discourse page you want it on and I'll add it there @Yaron Levi
a

Anna Geller

12/09/2022, 2:42 AM
Ben is G.O.A.T 🐐
πŸ™Œ 2