<@ULVA73B9P> I want to kick off sub flow runs from...
# ask-marvin
e
@Marvin I want to kick off sub flow runs from a root flow run. However, I want to make sure I don't have too many children flow runs executing at the same time. What would be the best way to limit the number of concurrent flow runs in this case?
m
To limit the number of concurrent sub flow runs from a root flow in Prefect, you can use global concurrency limits and task concurrency limits. You can apply Task Concurrency Limits by using task tags and specifying a maximum number of concurrent task runs permitted for tasks with that tag. The limitation applies across the entire system, not just within a single flow, and is particularly useful when dealing with tasks that may cause a bottleneck if run concurrently, such as database operations. Here's a section of the documentation that will be helpful: Task Concurrency Limits. Also, Global Concurrency Limits allow for controlling the number of concurrent tasks, flows, or operations that can run simultaneously. They ensure efficient management of execution, optimizing resources, and managing workload distribution. To implement this, you can use
rate limits
, which function to throttle the submission of tasks and ensure tasks are submitted at a controlled pace. The
slot_decay_per_second
setting on the concurrency limit defines the rate at which slots for task execution become available again. For more on this, check out the documentation here: Global Concurrency Limits and Rate Limits. Remember to set up your limits taking your system's capabilities and the specifics of your workflow into account to avoid undesired throttling or overloading.
e
@Marvin so in this case, would I kick off the children flow runs from a task, and then limit the number of tasks in my root flow run?
m
Yes, exactly! In your root flow, you would have a task that is responsible for kicking off the child flow runs. This task would be associated with a specific tag, and you would set a concurrency limit for that tag. By doing this, you control the number of concurrent child flow runs that are initiated by this task within your root flow. To do this: 1. Tag your task with a unique identifier. 2. Set a concurrency limit for that tag. 3. Ensure that the task in your root flow that triggers the subflow uses this tag. By using tags and concurrency limits, Prefect ensures that if your root flow tries to start more child flow runs than allowed by the concurrency limit, the excess flow runs will be queued instead of immediately executed. This prevents exceeding the number of desired concurrent executions. The relevant task in your root flow could look something like this:
Copy code
python
from prefect import flow, task

@task(tags=['child-flow-limit'])
def kick_off_child_flow():
    # Code to start a child flow run

@flow
def root_flow():
    # ... other flow logic ...
    child_flow_run = kick_off_child_flow()
    # ... other flow logic ...
Then configure your concurrency limit (via the UI or CLI) to apply to the tag
child-flow-limit
, setting it to the maximum number of concurrent child flow executions desired. Please remember to refer to the Task Concurrency Limits and Global Concurrency Limits and Rate Limits to set this up properly.
e
@Marvin Ok. Then in my task, can you show me an example of how I might kick off the child flow run, and have the task only finish when the child flow run is also finished?
m
To kick off a child flow run from a task and ensure the task only completes when the child flow run is also finished, you can use the
submit
method for the task, followed by calling the
wait
method on the resulting future. This allows for concurrent task execution while still ensuring that the task awaits the completion of the child flow run. Here's a simplified example:
Copy code
python
import time
from prefect import flow, task

# Define a task that starts a child flow run (represented here by a sleep)
@task
def start_and_wait_for_child_flow(t: int):
    # Code to kick off the child flow would go here
    # For the sake of this example, we just sleep for 't' seconds to simulate a child flow run
    print(f"Child flow running for {t} seconds...")
    time.sleep(t)
    print("Child flow has completed.")

# Define your root flow
@flow(log_prints=True)
def root_flow():
    # Start the child flow and wait for its completion
    child_flow_task = start_and_wait_for_child_flow.submit(5)  # Assume it runs for 5 seconds
    child_flow_task.wait()  # Block the root flow until the child flow task is complete

if __name__ == "__main__":
    root_flow()
In this example, the
start_and_wait_for_child_flow
task should include the actual logic for initiating the child flow run. The use of the
submit
method makes the task run asynchronously, and
wait
will block until the child flow has finished. Please adapt the sleep logic with the actual code you would use to kick off your child flow runs. Refer to the Prefect documentation for Running Tasks and consider the need for proper error handling and timeout considerations when waiting for child flows to complete.
e
@Marvin No i specifically want the example where the task kicks off a child flow run, then waits until the flow run is either finished or failed. That way putting a concurrency limit on the task would also mean that I don't ahve too many child flow runs
k
you can't directly start a subflow from inside a task
m
It seems there might be a misunderstanding based on the previous code examples provided. Let's clarify things according to Prefect concepts: To kick off a child flow run from within a task and wait for it to finish (either successfully or fail) before proceeding, you should do the following within your task function: 1. Use the
prefect.client.Client
to create a new flow run for the child flow. 2. Poll the status of the flow run until it reaches a "Finished" state (either "Success" or "Failed") using the flow run ID returned by the client. Here's a conceptual example in Python using Prefect 2.x concepts:
Copy code
python
from prefect import flow, task
from prefect.client import get_client
import asyncio

@task
async def start_child_flow_and_wait(child_flow_function):
    async with get_client() as client:
        # Create a new child flow run
        flow_run_id = await client.create_flow_run(name_of_flow_run="child-flow")
        
        # Wait for the child flow run to finish
        while True:
            flow_run_info = await client.read_flow_run(flow_run_id)
            if flow_run_info.state in {"Success", "Failed"}:
                break
            await asyncio.sleep(5)  # Poll every 5 seconds (adjust as necessary)

@flow
def parent_flow():
    # This task will kick off the child flow and wait for completion
    start_child_flow_and_wait(child_flow_function=my_child_flow)

@flow
def my_child_flow():
    # Define the child flow here
    pass
Please adjust the polling mechanism (
await asyncio.sleep(5)
) as needed for your use case or use an existing Prefect utility if available for your version. Given that your constraint requirement also involves setting concurrency limits, ensure the task that kicks off and waits for the child flow runs has a tag associated with it, and a concurrency limit is set for that tag. This way, the number of tasks (and thus the number of child flows) running in parallel will be capped by the concurrency limit. Please refer to Prefect's API and updated documentation for specifics on how to create flow runs using the client and how to monitor their state. You may need to adjust the example's
create_flow_run
and
read_flow_run
calls based on the actual available methods in your version of Prefect. If you need further clarification or assistance, feel free to ask.
e
@Kevin Grismore I see. So should I manually keep track of currently running child flow runs from my root flow? And only start a new one once one has finished? I do something similar with my tasks
Copy code
def prefect_async_submit(task_fn, task_args: List, callback_fn: callable = None):
    task_submits = []
    task_finishes = []

    logger = get_run_logger()
    <http://logger.info|logger.info>("Starting deep dive insights flow")

    for submit_task_args in task_args:
        while len(task_submits) == 5:
            time.sleep(1)
            <http://logger.info|logger.info>("Waiting for tasks to complete")
            <http://logger.info|logger.info>(task_submits)

            for submit in task_submits:
                if submit.get_state().is_completed() or submit.get_state().is_failed() or submit.get_state().is_crashed():
                    task_finishes.append(submit)
                    task_submits.remove(submit)

                    if callback_fn:
                        callback_fn(task_submits, task_finishes)

        <http://logger.info|logger.info>(f"Submitting task with args: {submit_task_args}")
        new_submit = task_fn(**submit_task_args)
        task_submits.append(new_submit)
        time.sleep(.25)

    while len(task_finishes) < len(task_args):
        for submit in task_submits:
            if submit.get_state().is_completed() or submit.get_state().is_failed() or submit.get_state().is_crashed():
                task_finishes.append(submit)
                task_submits.remove(submit)

                if callback_fn:
                    callback_fn(task_submits, task_finishes)

        time.sleep(5)
        <http://logger.info|logger.info>("Waiting for tasks to complete")
        <http://logger.info|logger.info>(task_submits)
k
You might be able to do something similar provided your subflows are all async?
what does this do that a global concurrency limit doesn't do though?
e
Is there a way to set a global concurrency limit on a deployment or flow? If there is an easier way to do it I would love to explore that
Oh I see there is a work queue / pool level for flow runs
k
I'd start here if you haven't seen this before
yeah there are a couple of different kinds of concurrency limits: • on work pools and work queues for flow runs • tag based concurrency limits to limit task runs that have the same tag • global concurrency limits, that work anywhere in python, even outside prefect flows
e
I'm pulling from all sync sources every hour or so, so I just want to make sure I don't have too many parse jobs running at the same time. But I also don't want to block manually triggered parse jobs. Which is why I wanted to limit the number of child flow runs specifcally kicked off from my scheduled root flow run but not globally
k
by "child flow runs" do you mean in-process subflows, as in a flow function calling another flow function? or kicking off a run of a deployment from inside a flow?
e
Kicking off a run of a deployment from inside a flow
I guess I don't have to do it that way
but my utility functions currently create a new flow run using the Prefect client
k
you could also use
run_deployment
which does the same thing provided it's called from inside a flow run
I think if I wanted to set a concurrency limit for a specific type of run, I'd make two deployments, like
parse_job_manual
and
parse_job_scheduled
. I'd put each deployment on a different work queue in the same work pool, each with different concurrency limits
e
Yes I do this
Copy code
prefect_job_id = run_prefect_deployment(
            slug=safe_get_env_var("<deployment_id>"),
            params={

            }
        )
k
that way if
parse_job_scheduled
maxes out its work queue concurrency limit,
parse_job_manual
would still run
e
I didn't know you could do that, it sounds a bit complicated. I only have one work queue in each of my work pools
k
you can add new work queues easily from the UI or CLI
this isn't the only solution, but to me it's the simplest, most prefect-oriented and transparent solution, given that you can independently set and view concurrency limits for different modes of work in the same pool
e
How would I define that in the deployment.yaml? I'm guessing there is just another field I need to add like
Copy code
definitions:

  work_pool: &common_work_pool
    name: $PREFECT_WORK_POOL_NAME
    work_queue: ?
k
work_queue_name
but yes
e
Would the root flow that kicks off the parse jobs need to be in the same work_queue? I guess its probably good practice for it to be in the same queue anyway
And this way the flow runs would be pending until concurrency frees up, and this is all managed by the work queue right?
So I can just submit all of them at the same time
k
doesn't need to be in the same queue. flow runs in one work queue can start deployment runs in another work queue, or even another work pool for that matter
e
Does Prefect guarantee ordering for the pending flow runs?
By the way this is super helpful, thank you for your patience answering questios
k
and yes, you can submit them all at the same time. technically they'd be in the "Late" state since they're scheduled but not picked up by a worker yet
prefect will handle all the queueing
e
Great. So I can just create another deployment using the same flow run code, but just aimed at a different work_queue. And then just submit all my auto flow runs from a root flow run.
k
yup!
e
Thank you! I will give that a go then, this was very helpful.
k
always happy to help. especially since you've been around and using prefect for a while!
🙏 1
e
@Marvin give me an example of a deployment yaml that creates a deployment that runs a flow every hour
m
To create a deployment that runs your flow every hour, you can use Prefect's scheduling system and define an interval schedule in a YAML configuration file for your deployment. The schedule would be specified using cron syntax to indicate the hourly interval. Here is an example YAML configuration for such a deployment:
Copy code
yaml
version: 1
type: deployment
name: example-flow-deployment
schedule:
  type: interval
  cron: "0 * * * *"
entrypoint: "path/to/your/flow.py:flow_function"
specification:
  flow_name: "example-flow-name"
  parameters: {}
In this example: - The
schedule
section specifies an interval type with a cron expression:
"0 * * * *"
indicates the flow should run at the beginning of every hour. -
entrypoint
should be the path to the Python script containing your flow definition followed by a colon and the flow's function name. - You also need to specify the
flow_name
, which is the name you've given to your flow in your script. Please replace
path/to/your/flow.py:flow_function
with the actual file path and flow name, and
example-flow-name
with the name you have for your flow in your code. Ensure you have Prefect installed and set up to interact with Prefect Cloud or Prefect Server, as this affects how you will run the Prefect CLI commands to apply the deployment from the YAML file, like so:
Copy code
bash
prefect deployment apply your-deployment.yaml
Check the Prefect Schedules documentation for more information on setting up schedules for deployments.
e
@Marvin can you show me an example for prefect 2? The current format ive been using looks like this
Copy code
name: flows
prefect-version: 2.12.0

pull:
- prefect.deployments.steps.set_working_directory:
    directory: /

# the definitions section allows you to define reusable components for your deployments
definitions:
  tags: &common_tags

  work_pool: &common_work_pool
    name: $PREFECT_WORK_POOL_NAME
    job_variables:
      image: $BACKEND_IMAGE_URI
      env:
        <env vars here>
      tolerations:
        <infra related here>
      node_selector:
        "kube/nodetype": "asyncjobs"

# the deployments section allows you to provide configuration for deploying flows
deployments:
- name: $ENV
  tags: *common_tags
  schedule: null
  entrypoint: <entry point here>
  work_pool: *common_work_pool
m
Certainly! Based on the current format you're using for Prefect 2.x, an example of a deployment YAML file that runs a flow every hour would look something like this:
Copy code
yaml
name: flows
prefect-version: 2.12.0

pull:
- prefect.deployments.steps.set_working_directory:
    directory: /

definitions:
  tags: &common_tags
    - "example-tag"

  work_pool: &common_work_pool
    name: $PREFECT_WORK_POOL_NAME
    job_variables:
      image: $BACKEND_IMAGE_URI
      env:
        # <env vars here>
      tolerations:
        # <infra related here>
      node_selector:
        "kube/nodetype": "asyncjobs"

    # Here, we define a schedule. Every hour on the 0th minute
  schedule: &common_schedule
    cron: "0 * * * *"

deployments:
- name: $ENV
  tags: *common_tags
  schedule: *common_schedule
  entrypoint: <entry point here>
  work_pool: *common_work_pool
In this example: - Under
definitions
, I've added a new entry,
schedule
, that uses the cron syntax
"0 * * * *"
. This means the deployment will trigger the flow to run every hour. - The
deployments
section then references
*common_schedule
for the flow to be executed hourly based on the defined schedule. Replace
<entry point here>
,
$BACKEND_IMAGE_URI
, and
$ENV
with the corresponding values for your deployment.
<entry point here>
should be the path to your flow file, including the flow function (e.g.,
src/my_flow.py:my_flow
). The
$BACKEND_IMAGE_URI
will be the URI of the Docker image you wish to use. Make sure to also replace the placeholders like
# <env vars here>
with the actual environment variables and
# <infra related here>
with any necessary infrastructure specifics. For more details on the schedule format and Prefect 2.x deployment definitions, you may refer to the Prefect Deployment documentation.
e
@Marvin Out of curiosity, how would i create a deployment that kicks off a flow run every five minutes/
m
To create a deployment that triggers a flow run every five minutes in Prefect 2.x, you would use a cron schedule set to
*/5 * * * *
, which specifies that the event should occur every 5 minutes. Here is how you can modify your deployment YAML to include this five-minute interval:
Copy code
yaml
name: flows
prefect-version: 2.12.0

pull:
- prefect.deployments.steps.set_working_directory:
    directory: /

definitions:
  tags: &common_tags
    - "example-tag"

  work_pool: &common_work_pool
    name: $PREFECT_WORK_POOL_NAME
    job_variables:
      image: $BACKEND_IMAGE_URI
      env:
        # <env vars here>
      tolerations:
        # <infra related here>
      node_selector:
        "kube/nodetype": "asyncjobs"

    # Schedule definition for every 5 minutes
  schedule: &common_schedule
    cron: "*/5 * * * *"

deployments:
- name: $ENV
  tags: *common_tags
  schedule: *common_schedule
  entrypoint: <entry point here>
  work_pool: *common_work_pool
Just like in the hourly example, replace
<entry point here>
,
$BACKEND_IMAGE_URI
, and
$ENV
with the appropriate values for your deployment. The cron expression in the
schedule
should stay as
"*/5 * * * *"
to ensure that your flow is being triggered every five minutes. Please make sure you replace all placeholder values in the YAML with the actual configuration for your deployment before applying it.
e
@Marvin how would I do it in this format? Im using this to do create deployments that run on my local
Copy code
if __name__ == "__main__":
    
    deployment: Deployment = Deployment.build_from_flow(
        flow=flow_code,
        name=os.getenv("PREFECT_ENV_NAME"),
        version="0.1",
        work_pool_name=os.getenv("PREFECT_WORK_POOL_NAME"),
        # overwrite=True
    )

    deployment.apply()
m
To create a deployment with a schedule that runs every five minutes using Prefect 2.x in Python, you can use the
IntervalSchedule
class from Prefect's
schedule
module to define your schedule. The code snippet you provided would need to be expanded to include the schedule. Here's how you could modify your code:
Copy code
python
from datetime import timedelta
from prefect.deployments import Deployment
from prefect.orion.schemas.schedules import IntervalSchedule
import os

# Assuming `flow_code` is your flow function

if __name__ == "__main__":
    
    # Define the interval schedule to run every 5 minutes
    interval_schedule = IntervalSchedule(interval=timedelta(minutes=5))

    deployment: Deployment = Deployment.build_from_flow(
        flow=flow_code,
        name=os.getenv("PREFECT_ENV_NAME"),
        version="0.1",
        work_pool_name=os.getenv("PREFECT_WORK_POOL_NAME"),
        schedule=interval_schedule,
        # overwrite=True  # Uncomment if you want to overwrite existing deployments with the same name
    )

    deployment.apply()
In this example: - An
IntervalSchedule
with a five-minute interval is created using
timedelta(minutes=5)
. - This schedule is passed to the
Deployment.build_from_flow()
method using the
schedule
keyword. By running this script, the deployment will be created with the specified schedule and will execute the flow every five minutes on your local machine. Remember to set the necessary environment variables (
PREFECT_ENV_NAME
,
PREFECT_WORK_POOL_NAME
) before running the script.
k
ignore marvin on that one I was too quick to judge this might be correct
e
Is this the right path?
Copy code
from prefect.client.schemas.schedules import IntervalSchedule
k
yeah the schedule classes are under client schemas
👍 1
e
Got it working locally and in dev! Thank you @Kevin Grismore
🙌 1