Alex Ilyin
09/11/2024, 8:25 PMMarvin
09/11/2024, 8:25 PMAlex Ilyin
09/11/2024, 8:25 PMMarvin
09/11/2024, 8:25 PMpython
from prefect.triggers import all_finished, some_failed, any_successful
# Define a compound trigger
compound_trigger = some_failed(all_finished, any_successful)
@prefect.flow(trigger=compound_trigger)
def my_flow():
task_result = some_task()
another_task(upstream_tasks=[task_result])
# Define your tasks as per usual
@prefect.task
def some_task():
return "Task Completed"
@prefect.task
def another_task():
print("Another Task Running")
my_flow()
This example uses triggers like all_finished
, some_failed
, and any_successful
. You can combine them to create complex dependencies.
You can read more about it here. Let me know if you need anything else! 😃Alex Ilyin
09/11/2024, 8:26 PMMarvin
09/11/2024, 8:26 PMprefect.events.TriggerTypes
. Here's an example:
python
from prefect.events import CompoundTrigger
# Create your individual triggers
first_trigger = {
"type": "event",
"posture": "Proactive",
"after": ["prefect.flow-run.Pending"],
"expect": ["prefect.flow-run.Running", "prefect.flow-run.Crashed"],
"for_each": ["prefect.resource.id"],
"match_related": {
"prefect.resource.name": "daily-customer-export",
"prefect.resource.role": "flow"
}
}
second_trigger = {
"type": "event",
"posture": "Reactive",
"expect": ["prefect.work-pool.not-ready"],
"match": {
"prefect.resource.name": "kubernetes-workers",
}
}
third_trigger = {
"type": "metric",
"metric": {
"name": "lateness",
"operator": ">",
"threshold": 600,
"range": 3600,
"firing_for": 300
}
}
# Create a compound trigger
compound_trigger = CompoundTrigger(
require="any",
triggers=[first_trigger, second_trigger, third_trigger]
)
# Output the compound trigger to verify
print(compound_trigger)
This example demonstrates how you can combine multiple event and metric triggers into a CompoundTrigger
which will activate if any of the specified conditions are met.
For more info, check the documentation. If you need more examples or details, let me know! 😃Alex Ilyin
09/11/2024, 8:27 PMrequire
argument in the CompoundTrigger function there?Marvin
09/11/2024, 8:28 PMrequire
parameter in the CompoundTrigger
function determines the logical condition that must be met for the compound trigger to fire. Here’s a breakdown:
1. `"any"`: The compound trigger will fire if any of the individual triggers are satisfied.
2. `"all"`: The compound trigger will fire only if all of the individual triggers are satisfied.
3. A number (e.g., 1
, 2
): The compound trigger will fire if at least that number of individual triggers are satisfied.
So, in the example I provided:
python
compound_trigger = CompoundTrigger(
require="any", # It will trigger if any one of the individual triggers is satisfied
triggers=[first_trigger, second_trigger, third_trigger]
)
You can replace "any"
with "all"
or a specific number based on your requirements.
For more info, refer to the Prefect documentation. Let me know if you have more questions! 😃Nate
09/11/2024, 8:33 PMNate
09/11/2024, 8:33 PMNate
09/11/2024, 8:34 PMfrom prefect.events import DeploymentCompoundTrigger
Alex Ilyin
09/11/2024, 8:37 PMpydantic.error_wrappers.ValidationError: 1 validation error for Automation
actions
field required (type=value_error.missing)
Alex Ilyin
09/11/2024, 8:37 PMactions=RunDeployment(deployment_id="e3eb9477-1273-47f0-95ad-d658e01d7972")
which works on a regular EventTrigger
but not with the CompoundTrigger
functionsNate
09/11/2024, 8:38 PMAlex Ilyin
09/11/2024, 8:38 PMfirst_trigger = {
"type": "event",
"posture": "Reactive",
"match": {
"prefect.resource.id": [
"prefect.flow-run.*"
]},
"expect": ["prefect.flow-run.Completed"],
"match_related":{
"prefect.resource.id": [
"prefect.deployment.2682b223-75bd-48ac-888f-00f6ec44470f"
]}
}
second_trigger = {
"type": "event",
"posture": "Reactive",
"match": {
"prefect.resource.id": [
"prefect.flow-run.*"
]},
"expect": ["prefect.flow-run.Completed"],
"match_related":{
"prefect.resource.id": [
"prefect.deployment.e36a5e8c-0ef7-4a99-b8ec-d385705c5965"
]}
}
automation = Automation(
name="test cli compound trigger",
trigger=DeploymentCompoundTrigger(require="all",
triggers=[first_trigger, second_trigger],
actions=RunDeployment(deployment_id="e3eb9477-1273-47f0-95ad-d658e01d7972"))
).create()
Nate
09/11/2024, 8:38 PMAlex Ilyin
09/11/2024, 8:39 PMAlex Ilyin
09/11/2024, 8:39 PMNate
09/11/2024, 8:41 PMNate
09/11/2024, 8:45 PMfrom prefect.automations import Automation
from prefect.events.actions import RunDeployment
from prefect.events.schemas.deployment_triggers import DeploymentCompoundTrigger
first_trigger = {
"type": "event",
"posture": "Reactive",
"match": {"prefect.resource.id": ["prefect.flow-run.*"]},
"expect": ["prefect.flow-run.Completed"],
"match_related": {
"prefect.resource.id": [
"prefect.deployment.2682b223-75bd-48ac-888f-00f6ec44470f"
]
},
}
second_trigger = {
"type": "event",
"posture": "Reactive",
"match": {"prefect.resource.id": ["prefect.flow-run.*"]},
"expect": ["prefect.flow-run.Completed"],
"match_related": {
"prefect.resource.id": [
"prefect.deployment.e36a5e8c-0ef7-4a99-b8ec-d385705c5965"
]
},
}
automation = Automation(
name="test cli compound trigger",
description="test cli compound trigger",
enabled=True,
trigger=DeploymentCompoundTrigger(
name="test cli compound trigger",
description="test cli compound trigger",
within=None,
enabled=True,
parameters=None,
job_variables=None,
require="all",
triggers=[first_trigger, second_trigger],
),
actions=[RunDeployment(deployment_id="e3eb9477-1273-47f0-95ad-d658e01d7972")],
).create()
bc in general an Automation has
• a trigger
• a set of actions to perform when triggeredNate
09/11/2024, 8:46 PMAlex Ilyin
09/11/2024, 8:46 PMAlex Ilyin
09/11/2024, 8:46 PMAlex Ilyin
09/11/2024, 8:47 PMNate
09/11/2024, 8:47 PMNate
09/11/2024, 8:47 PMAlex Ilyin
09/11/2024, 8:47 PMNate
09/11/2024, 8:47 PMprefect automation ls
Alex Ilyin
09/11/2024, 8:48 PMAlex Ilyin
09/11/2024, 8:48 PMNate
09/11/2024, 8:48 PMAlex Ilyin
09/11/2024, 8:54 PM