Robert Phamle
04/25/2024, 9:45 PM{
"type": "event",
"match": {
"prefect.resource.id": "prefect.flow-run.*"
},
"match_related": {
"prefect.resource.id": [
"prefect.tag.pagerduty_automation:pd-sq-sqprefect-tests-stage",
"prefect.tag.pagerduty_automation:pd-sq-sqprefect-tests-prod"
],
"prefect.resource.role": "tag"
},
"after": [
"prefect.flow-run.Pending"
],
"expect": [
"prefect.flow-run.*"
],
"for_each": [
"prefect.resource.id"
],
"posture": "Proactive",
"threshold": 1,
"within": 1200
}
Marvin
04/25/2024, 9:45 PM"threshold"
and "within"
properties in the given configuration. The "threshold"
should be set to 5, since you want the trigger to fire after 5 events. The "within"
property should be set to 3600, which is the number of seconds in 1 hour. The revised configuration would look something like this:
json
{
"type": "event",
"match": {
"prefect.resource.id": "prefect.flow-run.*"
},
"match_related": {
"prefect.resource.id": [
"prefect.tag.pagerduty_automation:pd-sq-sqprefect-tests-stage",
"prefect.tag.pagerduty_automation:pd-sq-sqprefect-tests-prod"
],
"prefect.resource.role": "tag"
},
"after": [
"prefect.flow-run.Pending"
],
"expect": [
"prefect.flow-run.*"
],
"for_each": [
"prefect.resource.id"
],
"posture": "Proactive",
"threshold": 5,
"within": 3600
}
This configuration will now correctly set the system to trigger an event when 5 matching events occur within the span of 3600 seconds (or 1 hour). Please adjust accordingly in your actual configuration file or trigger setup.Robert Phamle
04/25/2024, 9:47 PMMarvin
04/25/2024, 9:48 PMjson
{
"type": "event",
"match": {
"prefect.resource.id": "prefect.flow-run.*"
},
"match_related": {
"prefect.resource.id": [
"prefect.tag.pagerduty_automation:pd-sq-sqprefect-tests-stage",
"prefect.tag.pagerduty_automation:pd-sq-sqprefect-tests-prod"
],
"prefect.resource.role": "tag"
},
"after": [
"prefect.flow-run.Pending"
],
"expect": [
"prefect.flow-run.*"
],
"for_each": [
"prefect.resource.id"
],
"posture": "Reactive",
"threshold": 5,
"within": 3600
}
Please note, this configuration only ensures that the trigger checks for 5 flow runs in "Pending" within an hour; it does not enforce the condition that they must be stuck for more than 20 minutes. You would likely need additional custom logic outside of this configuration, possibly with a script or a custom task in your flows, to track the duration for which flow runs have been in "Pending" status and then take action if they exceed 20 minutes.Nate
04/25/2024, 9:53 PMRobert Phamle
04/25/2024, 9:56 PMNate
04/25/2024, 9:57 PMNate
04/25/2024, 9:57 PMRobert Phamle
04/25/2024, 9:58 PMNate
04/25/2024, 9:58 PMNate
04/25/2024, 9:58 PMthreshold
Robert Phamle
04/25/2024, 9:59 PMRobert Phamle
04/25/2024, 9:59 PMMarvin
04/25/2024, 10:00 PMpython
from datetime import datetime, timedelta
import prefect
from prefect import task, Flow
from prefect.client import Client
@task
def check_pending_flows():
client = Client()
# Replace `my_flow_id` with your actual flow ID or use other parameters to narrow down the runs
flow_runs = client.get_flow_runs(flow_id='my_flow_id', state='Pending')
now = datetime.utcnow()
pending_too_long = []
for flow_run in flow_runs:
flow_run_start_time = parse(flow_run.start_time) # Replace `parse` with actual method to parse the start_time
if now - flow_run_start_time > timedelta(minutes=20):
pending_too_long.append(flow_run.id)
if len(pending_too_long) > 0:
# Trigger your desired event here, e.g., send a notification or mark them differently
# This could be logging, calling an API, etc.
print(f"Flow runs pending for too long: {pending_too_long}")
with Flow("check-pending-flows") as flow:
check_pending_flows()
# The flow could be scheduled to run every X minutes to continuously check on the pending flow runs
# You would set up a schedule that aligns with your needs, ensuring that it runs often enough to detect
# runs pending for more than 20 minutes.
In the above example, the check_pending_flows
task checks all flow runs with a "Pending" state for your particular flow and determines if any have been in that state for over 20 minutes. If such runs are found, you can trigger any event or notification system you have in place.
Keep in mind that this is not a standard trigger configuration that can be simply adjusted in a configuration file; rather, it's a custom implementation that needs to be integrated into your Prefect workflow setup.Robert Phamle
04/25/2024, 10:03 PM{
"type": "event",
"match": {
"prefect.resource.id": "prefect.flow-run.*"
},
"match_related": {
"prefect.resource.id": [
"prefect.tag.pagerduty_automation:pd-sq-sqprefect-tests-stage",
"prefect.tag.pagerduty_automation:pd-sq-sqprefect-tests-prod"
],
"prefect.resource.role": "tag"
},
"after": [
"prefect.flow-run.Pending"
],
"expect": [
"prefect.flow-run.*"
],
"for_each": [
"prefect.resource.id"
],
"posture": "Proactive",
"threshold": 1,
"within": 1200
}
Marvin
04/25/2024, 10:04 PM"prefect.flow-run.*"
(wildcard could be used to denote any flow run event).
- `"match_related"`: Defines related resource identifiers that must also match for the event to be relevant. Here it's matching events related to specific tags (e.g., pd-sq-sqprefect-tests-stage
and pd-sq-sqprefect-tests-prod
) that denote stages of an automation flow.
- `"after"`: Specifies the initial state after which an event should be considered (in this case: when a flow run enters the "Pending" state).
- `"expect"`: Specifies what kind of subsequent event is expected to occur for a flow run (e.g., any event related to a flow run).
- `"for_each"`: Indicates that this trigger configuration applies to each matching "prefect.resource.id"
.
- `"posture": "Proactive"`: This defines the trigger as "Proactive". This posture means the trigger is set up to act before certain events occur, based on predictive behavior. A proactive posture might mean, for instance, that once a threshold is reached, an event will trigger in anticipation of an issue or to proactively manage resources.
- `"threshold": 1`: Specifies the number of events that must occur to trigger the action, which in this case is set to 1.
- `"within": 1200`: Defines the timeframe in which the threshold must be met for the trigger to fire, which is set to 1200 seconds (20 minutes). It means if one event matching the configuration occurs within 20 minutes, the trigger should fire.
So, in summary, if the system observes an event where a flow run with a specific tag enters the "Pending" status and no other events occur for that flow run within 20 minutes, it's supposed to trigger some sort of action. It's proactive because it's based on the potential for an issue to occur if that one event happens and then nothing else happens within a specified timeframe.
This would not express a situation where a flow run is confirmed to remain in "Pending" for over 20 minutes; rather, it’s a predictive anticipation that after a flow run becomes "Pending", if no other changes are detected within 20 minutes, there may be a problem that needs proactive handling.
Given it's a "Proactive" posture and not "Reactive", the actual trigger of a problem (e.g., the run being stuck in "Pending") may not have actually happened—it's simply set up to anticipate and possibly preempt it.