Sergey Moskvin
01/22/2025, 10:05 AMMarvin
01/22/2025, 10:12 AMDeploymentTrigger
to set up conditions under which a flow will trigger another flow. This involves specifying the expected state of a related flow or resource. For example, you can configure a downstream flow to trigger when an upstream flow completes successfully.
2. Python Implementation: In Python, you can define flows and use Prefect's deployment and triggering mechanisms. You can create a deployment using the flow.deploy
method and set up triggers using Prefect's event schemas.
Here's a snippet from the Prefect documentation for reference:
python
from prefect import flow, tags
from prefect.events.schemas import DeploymentTrigger
TAG_NAME = "foobar"
@flow
def upstream():
pass
@flow
def downstream():
pass
if __name__ == "__main__":
with tags(TAG_NAME):
upstream()
upstream()
upstream()
downstream.serve(
name="downstream-of-tagged-upstream",
triggers=[
DeploymentTrigger(
expect={"prefect.flow-run.Completed"},
match_related=[
{"prefect.resource.role": "tag", "prefect.resource.id": f"prefect.tag.{TAG_NAME}"},
{"prefect.resource.role": "flow", "prefect.resource.name": "upstream"},
],
threshold=3,
)
]
)
For more details, you can explore the Prefect documentation and related resources:
- Prefect Deployment Example
- Tagged Upstream Flow Example
If you're using Prefect 3.x, remember that the API may differ slightly, so always check the latest documentation for your version.Sergey Moskvin
01/22/2025, 10:22 AMMarvin
01/22/2025, 10:22 AMrun_deployment
function in a callback that is executed upon the completion of the upstream flow.
Here's an example of how you can set this up:
python
from prefect import flow
from prefect.deployments import run_deployment
def trigger_downstream(flow, flow_run, state):
run_deployment("downstream/deployment-b", parameters={"foo": "bar"}, timeout=0)
@flow(log_prints=True, on_completion=[trigger_downstream])
def upstream():
print("Doing some work")
@flow(log_prints=True)
def downstream(foo: str):
assert foo == "bar"
In this example:
- The upstream
flow is defined with an on_completion
callback that calls trigger_downstream
.
- The trigger_downstream
function uses run_deployment
to trigger the downstream flow named "downstream/deployment-b"
with specific parameters.
For more detailed information, you can refer to the example in the Prefect documentation: Chain Example.
This setup allows the upstream flow to trigger the downstream flow directly upon its completion, making it a straightforward way to manage dependent flows.Sergey Moskvin
01/22/2025, 11:19 AMMarvin
01/22/2025, 11:19 AMrun_deployment
function with a synchronous wait. This approach allows the upstream flow to block until the downstream flow finishes.
Here's a conceptual approach to achieve this:
1. Trigger Downstream Flow: Use run_deployment
to start the downstream flow.
2. Wait for Completion: You can wait for the downstream flow to complete by checking its state in a loop or using Prefect's state management tools.
Below is a simplified example:
python
from prefect import flow
from prefect.deployments import run_deployment
from prefect.states import wait_for_flow_run
@flow
def upstream():
# Trigger the downstream flow
downstream_flow_run = run_deployment("downstream/deployment-b", parameters={"foo": "bar"}, timeout=0)
# Wait for the downstream flow to complete
wait_for_flow_run(downstream_flow_run)
print("Downstream flow completed, proceeding with upstream tasks.")
@flow
def downstream(foo: str):
print("Running downstream flow with parameter:", foo)
In this example:
- run_deployment
starts the downstream flow.
- wait_for_flow_run
is used to block the upstream flow until the downstream flow completes. This ensures that no other runs of the upstream flow occur until the downstream tasks are finished.
This approach effectively synchronizes the upstream and downstream flows, preventing any premature completion of the upstream tasks. Always ensure that you handle potential exceptions and state transitions to avoid deadlocks or unexpected behavior.Sergey Moskvin
01/22/2025, 11:24 AMMarvin
01/22/2025, 11:24 AMSergey Moskvin
01/22/2025, 11:25 AMMarvin
01/22/2025, 11:25 AMbash
prefect work-pool create my-work-pool --concurrency-limit 5
This command creates a work pool named my-work-pool
with a concurrency limit of 5, meaning only 5 flow runs can be active at the same time.
### Slot Decay
Slot decay allows you to control how quickly occupied slots become available again. This can be set by specifying the slot_decay_per_second
parameter:
python
from prefect import concurrency
# Set a global concurrency limit with slot decay
concurrency.set_global_concurrency_limit(
"my-concurrency-limit",
concurrency_limit=5,
slot_decay_per_second=0.5
)
- concurrency_limit
: The maximum number of concurrent runs.
- slot_decay_per_second
: Controls how quickly slots refresh. A higher value allows slots to refresh faster, enabling more frequent task executions.
### Example Usage
By configuring both the concurrency limit and slot decay, you can ensure that your system efficiently manages the execution of flows, allowing other tasks to proceed even if some are waiting:
- If an upstream flow is waiting, it occupies a slot.
- Other flows can still run if there are available slots, thanks to the concurrency limit.
- The slot decay ensures that slots are freed up over time, allowing new tasks to start.
For more information, you can refer to Prefect's documentation on global concurrency limits and work pool management.Sergey Moskvin
01/22/2025, 11:28 AMMarvin
01/22/2025, 11:28 AMSergey Moskvin
01/22/2025, 11:30 AMMarvin
01/22/2025, 11:30 AMpython
from prefect import flow, task
import asyncio
@task
async def async_task(name):
await asyncio.sleep(1)
print(f"Task {name} completed")
@flow
async def async_flow():
await asyncio.gather(
async_task("A"),
async_task("B"),
async_task("C")
)
@flow
def main_flow():
asyncio.run(async_flow())
if __name__ == "__main__":
main_flow()
### Explanation
- Asynchronous Tasks: The async_task
function is defined as an asynchronous task using the async def
keyword. It simulates an asynchronous operation with await asyncio.sleep(1)
.
- Asynchronous Flow: The async_flow
function is an asynchronous flow that uses asyncio.gather
to run multiple tasks concurrently. This allows tasks "A", "B", and "C" to execute simultaneously.
- Main Flow: The main_flow
function runs the async_flow
using asyncio.run
, which is a standard way to execute asynchronous code in Python.
### Considerations
- Concurrency Management: While asynchronous execution allows multiple tasks to run concurrently, the actual concurrency is subject to Python's Global Interpreter Lock (GIL) and the capabilities of the underlying infrastructure.
- Resource Utilization: Running multiple flows or tasks concurrently on a single worker can lead to resource contention, so it's important to monitor and manage your system's resources appropriately.
This approach can be particularly useful for I/O-bound operations where tasks can yield control while waiting for external resources, thereby improving the overall throughput of your workflows.