Eric
03/29/2024, 2:19 PMMarvin
03/29/2024, 2:19 PMrate limits
, which function to throttle the submission of tasks and ensure tasks are submitted at a controlled pace. The slot_decay_per_second
setting on the concurrency limit defines the rate at which slots for task execution become available again. For more on this, check out the documentation here: Global Concurrency Limits and Rate Limits.
Remember to set up your limits taking your system's capabilities and the specifics of your workflow into account to avoid undesired throttling or overloading.Eric
03/29/2024, 2:20 PMMarvin
03/29/2024, 2:20 PMpython
from prefect import flow, task
@task(tags=['child-flow-limit'])
def kick_off_child_flow():
# Code to start a child flow run
@flow
def root_flow():
# ... other flow logic ...
child_flow_run = kick_off_child_flow()
# ... other flow logic ...
Then configure your concurrency limit (via the UI or CLI) to apply to the tag child-flow-limit
, setting it to the maximum number of concurrent child flow executions desired.
Please remember to refer to the Task Concurrency Limits and Global Concurrency Limits and Rate Limits to set this up properly.Eric
03/29/2024, 2:21 PMMarvin
03/29/2024, 2:22 PMsubmit
method for the task, followed by calling the wait
method on the resulting future. This allows for concurrent task execution while still ensuring that the task awaits the completion of the child flow run.
Here's a simplified example:
python
import time
from prefect import flow, task
# Define a task that starts a child flow run (represented here by a sleep)
@task
def start_and_wait_for_child_flow(t: int):
# Code to kick off the child flow would go here
# For the sake of this example, we just sleep for 't' seconds to simulate a child flow run
print(f"Child flow running for {t} seconds...")
time.sleep(t)
print("Child flow has completed.")
# Define your root flow
@flow(log_prints=True)
def root_flow():
# Start the child flow and wait for its completion
child_flow_task = start_and_wait_for_child_flow.submit(5) # Assume it runs for 5 seconds
child_flow_task.wait() # Block the root flow until the child flow task is complete
if __name__ == "__main__":
root_flow()
In this example, the start_and_wait_for_child_flow
task should include the actual logic for initiating the child flow run. The use of the submit
method makes the task run asynchronously, and wait
will block until the child flow has finished.
Please adapt the sleep logic with the actual code you would use to kick off your child flow runs.
Refer to the Prefect documentation for Running Tasks and consider the need for proper error handling and timeout considerations when waiting for child flows to complete.Eric
03/29/2024, 2:23 PMKevin Grismore
03/29/2024, 2:23 PMMarvin
03/29/2024, 2:23 PMprefect.client.Client
to create a new flow run for the child flow.
2. Poll the status of the flow run until it reaches a "Finished" state (either "Success" or "Failed") using the flow run ID returned by the client.
Here's a conceptual example in Python using Prefect 2.x concepts:
python
from prefect import flow, task
from prefect.client import get_client
import asyncio
@task
async def start_child_flow_and_wait(child_flow_function):
async with get_client() as client:
# Create a new child flow run
flow_run_id = await client.create_flow_run(name_of_flow_run="child-flow")
# Wait for the child flow run to finish
while True:
flow_run_info = await client.read_flow_run(flow_run_id)
if flow_run_info.state in {"Success", "Failed"}:
break
await asyncio.sleep(5) # Poll every 5 seconds (adjust as necessary)
@flow
def parent_flow():
# This task will kick off the child flow and wait for completion
start_child_flow_and_wait(child_flow_function=my_child_flow)
@flow
def my_child_flow():
# Define the child flow here
pass
Please adjust the polling mechanism (await asyncio.sleep(5)
) as needed for your use case or use an existing Prefect utility if available for your version.
Given that your constraint requirement also involves setting concurrency limits, ensure the task that kicks off and waits for the child flow runs has a tag associated with it, and a concurrency limit is set for that tag. This way, the number of tasks (and thus the number of child flows) running in parallel will be capped by the concurrency limit.
Please refer to Prefect's API and updated documentation for specifics on how to create flow runs using the client and how to monitor their state. You may need to adjust the example's create_flow_run
and read_flow_run
calls based on the actual available methods in your version of Prefect.
If you need further clarification or assistance, feel free to ask.Eric
03/29/2024, 2:27 PMdef prefect_async_submit(task_fn, task_args: List, callback_fn: callable = None):
task_submits = []
task_finishes = []
logger = get_run_logger()
<http://logger.info|logger.info>("Starting deep dive insights flow")
for submit_task_args in task_args:
while len(task_submits) == 5:
time.sleep(1)
<http://logger.info|logger.info>("Waiting for tasks to complete")
<http://logger.info|logger.info>(task_submits)
for submit in task_submits:
if submit.get_state().is_completed() or submit.get_state().is_failed() or submit.get_state().is_crashed():
task_finishes.append(submit)
task_submits.remove(submit)
if callback_fn:
callback_fn(task_submits, task_finishes)
<http://logger.info|logger.info>(f"Submitting task with args: {submit_task_args}")
new_submit = task_fn(**submit_task_args)
task_submits.append(new_submit)
time.sleep(.25)
while len(task_finishes) < len(task_args):
for submit in task_submits:
if submit.get_state().is_completed() or submit.get_state().is_failed() or submit.get_state().is_crashed():
task_finishes.append(submit)
task_submits.remove(submit)
if callback_fn:
callback_fn(task_submits, task_finishes)
time.sleep(5)
<http://logger.info|logger.info>("Waiting for tasks to complete")
<http://logger.info|logger.info>(task_submits)
Kevin Grismore
03/29/2024, 2:38 PMKevin Grismore
03/29/2024, 2:38 PMEric
03/29/2024, 2:41 PMEric
03/29/2024, 2:42 PMKevin Grismore
03/29/2024, 2:42 PMKevin Grismore
03/29/2024, 2:43 PMEric
03/29/2024, 2:45 PMKevin Grismore
03/29/2024, 2:46 PMEric
03/29/2024, 2:47 PMEric
03/29/2024, 2:47 PMEric
03/29/2024, 2:48 PMKevin Grismore
03/29/2024, 2:48 PMrun_deployment
which does the same thing provided it's called from inside a flow runKevin Grismore
03/29/2024, 2:50 PMparse_job_manual
and parse_job_scheduled
. I'd put each deployment on a different work queue in the same work pool, each with different concurrency limitsEric
03/29/2024, 2:50 PMprefect_job_id = run_prefect_deployment(
slug=safe_get_env_var("<deployment_id>"),
params={
}
)
Kevin Grismore
03/29/2024, 2:50 PMparse_job_scheduled
maxes out its work queue concurrency limit, parse_job_manual
would still runEric
03/29/2024, 2:51 PMKevin Grismore
03/29/2024, 2:52 PMKevin Grismore
03/29/2024, 2:54 PMEric
03/29/2024, 2:55 PMdefinitions:
work_pool: &common_work_pool
name: $PREFECT_WORK_POOL_NAME
work_queue: ?
Kevin Grismore
03/29/2024, 2:56 PMwork_queue_name
Kevin Grismore
03/29/2024, 2:56 PMEric
03/29/2024, 2:56 PMEric
03/29/2024, 2:57 PMEric
03/29/2024, 2:57 PMKevin Grismore
03/29/2024, 2:57 PMEric
03/29/2024, 2:58 PMEric
03/29/2024, 2:58 PMKevin Grismore
03/29/2024, 2:58 PMKevin Grismore
03/29/2024, 2:58 PMEric
03/29/2024, 2:59 PMKevin Grismore
03/29/2024, 2:59 PMEric
03/29/2024, 2:59 PMKevin Grismore
03/29/2024, 3:00 PMEric
03/30/2024, 10:52 PMMarvin
03/30/2024, 10:52 PMyaml
version: 1
type: deployment
name: example-flow-deployment
schedule:
type: interval
cron: "0 * * * *"
entrypoint: "path/to/your/flow.py:flow_function"
specification:
flow_name: "example-flow-name"
parameters: {}
In this example:
- The schedule
section specifies an interval type with a cron expression: "0 * * * *"
indicates the flow should run at the beginning of every hour.
- entrypoint
should be the path to the Python script containing your flow definition followed by a colon and the flow's function name.
- You also need to specify the flow_name
, which is the name you've given to your flow in your script.
Please replace path/to/your/flow.py:flow_function
with the actual file path and flow name, and example-flow-name
with the name you have for your flow in your code.
Ensure you have Prefect installed and set up to interact with Prefect Cloud or Prefect Server, as this affects how you will run the Prefect CLI commands to apply the deployment from the YAML file, like so:
bash
prefect deployment apply your-deployment.yaml
Check the Prefect Schedules documentation for more information on setting up schedules for deployments.Eric
03/30/2024, 10:55 PMname: flows
prefect-version: 2.12.0
pull:
- prefect.deployments.steps.set_working_directory:
directory: /
# the definitions section allows you to define reusable components for your deployments
definitions:
tags: &common_tags
work_pool: &common_work_pool
name: $PREFECT_WORK_POOL_NAME
job_variables:
image: $BACKEND_IMAGE_URI
env:
<env vars here>
tolerations:
<infra related here>
node_selector:
"kube/nodetype": "asyncjobs"
# the deployments section allows you to provide configuration for deploying flows
deployments:
- name: $ENV
tags: *common_tags
schedule: null
entrypoint: <entry point here>
work_pool: *common_work_pool
Marvin
03/30/2024, 10:55 PMyaml
name: flows
prefect-version: 2.12.0
pull:
- prefect.deployments.steps.set_working_directory:
directory: /
definitions:
tags: &common_tags
- "example-tag"
work_pool: &common_work_pool
name: $PREFECT_WORK_POOL_NAME
job_variables:
image: $BACKEND_IMAGE_URI
env:
# <env vars here>
tolerations:
# <infra related here>
node_selector:
"kube/nodetype": "asyncjobs"
# Here, we define a schedule. Every hour on the 0th minute
schedule: &common_schedule
cron: "0 * * * *"
deployments:
- name: $ENV
tags: *common_tags
schedule: *common_schedule
entrypoint: <entry point here>
work_pool: *common_work_pool
In this example:
- Under definitions
, I've added a new entry, schedule
, that uses the cron syntax "0 * * * *"
. This means the deployment will trigger the flow to run every hour.
- The deployments
section then references *common_schedule
for the flow to be executed hourly based on the defined schedule.
Replace <entry point here>
, $BACKEND_IMAGE_URI
, and $ENV
with the corresponding values for your deployment. <entry point here>
should be the path to your flow file, including the flow function (e.g., src/my_flow.py:my_flow
). The $BACKEND_IMAGE_URI
will be the URI of the Docker image you wish to use.
Make sure to also replace the placeholders like # <env vars here>
with the actual environment variables and # <infra related here>
with any necessary infrastructure specifics.
For more details on the schedule format and Prefect 2.x deployment definitions, you may refer to the Prefect Deployment documentation.Eric
03/30/2024, 11:09 PMMarvin
03/30/2024, 11:09 PM*/5 * * * *
, which specifies that the event should occur every 5 minutes.
Here is how you can modify your deployment YAML to include this five-minute interval:
yaml
name: flows
prefect-version: 2.12.0
pull:
- prefect.deployments.steps.set_working_directory:
directory: /
definitions:
tags: &common_tags
- "example-tag"
work_pool: &common_work_pool
name: $PREFECT_WORK_POOL_NAME
job_variables:
image: $BACKEND_IMAGE_URI
env:
# <env vars here>
tolerations:
# <infra related here>
node_selector:
"kube/nodetype": "asyncjobs"
# Schedule definition for every 5 minutes
schedule: &common_schedule
cron: "*/5 * * * *"
deployments:
- name: $ENV
tags: *common_tags
schedule: *common_schedule
entrypoint: <entry point here>
work_pool: *common_work_pool
Just like in the hourly example, replace <entry point here>
, $BACKEND_IMAGE_URI
, and $ENV
with the appropriate values for your deployment. The cron expression in the schedule
should stay as "*/5 * * * *"
to ensure that your flow is being triggered every five minutes.
Please make sure you replace all placeholder values in the YAML with the actual configuration for your deployment before applying it.Eric
03/30/2024, 11:10 PMif __name__ == "__main__":
deployment: Deployment = Deployment.build_from_flow(
flow=flow_code,
name=os.getenv("PREFECT_ENV_NAME"),
version="0.1",
work_pool_name=os.getenv("PREFECT_WORK_POOL_NAME"),
# overwrite=True
)
deployment.apply()
Marvin
03/30/2024, 11:11 PMIntervalSchedule
class from Prefect's schedule
module to define your schedule. The code snippet you provided would need to be expanded to include the schedule.
Here's how you could modify your code:
python
from datetime import timedelta
from prefect.deployments import Deployment
from prefect.orion.schemas.schedules import IntervalSchedule
import os
# Assuming `flow_code` is your flow function
if __name__ == "__main__":
# Define the interval schedule to run every 5 minutes
interval_schedule = IntervalSchedule(interval=timedelta(minutes=5))
deployment: Deployment = Deployment.build_from_flow(
flow=flow_code,
name=os.getenv("PREFECT_ENV_NAME"),
version="0.1",
work_pool_name=os.getenv("PREFECT_WORK_POOL_NAME"),
schedule=interval_schedule,
# overwrite=True # Uncomment if you want to overwrite existing deployments with the same name
)
deployment.apply()
In this example:
- An IntervalSchedule
with a five-minute interval is created using timedelta(minutes=5)
.
- This schedule is passed to the Deployment.build_from_flow()
method using the schedule
keyword.
By running this script, the deployment will be created with the specified schedule and will execute the flow every five minutes on your local machine. Remember to set the necessary environment variables (PREFECT_ENV_NAME
, PREFECT_WORK_POOL_NAME
) before running the script.Kevin Grismore
03/30/2024, 11:16 PMEric
03/30/2024, 11:16 PMfrom prefect.client.schemas.schedules import IntervalSchedule
Eric
03/30/2024, 11:17 PMKevin Grismore
03/30/2024, 11:17 PMEric
04/01/2024, 2:56 PM