<@ULVA73B9P> refactor so that trigger fires when 5...
# ask-marvin
r
@Marvin refactor so that trigger fires when 5 events occur within 1 hour
Copy code
{
  "type": "event",
  "match": {
    "prefect.resource.id": "prefect.flow-run.*"
  },
  "match_related": {
    "prefect.resource.id": [
      "prefect.tag.pagerduty_automation:pd-sq-sqprefect-tests-stage",
      "prefect.tag.pagerduty_automation:pd-sq-sqprefect-tests-prod"
    ],
    "prefect.resource.role": "tag"
  },
  "after": [
    "prefect.flow-run.Pending"
  ],
  "expect": [
    "prefect.flow-run.*"
  ],
  "for_each": [
    "prefect.resource.id"
  ],
  "posture": "Proactive",
  "threshold": 1,
  "within": 1200
}
m
To refactor the trigger so that it fires when 5 events occur within 1 hour, you need to adjust the
"threshold"
and
"within"
properties in the given configuration. The
"threshold"
should be set to 5, since you want the trigger to fire after 5 events. The
"within"
property should be set to 3600, which is the number of seconds in 1 hour. The revised configuration would look something like this:
Copy code
json
{
  "type": "event",
  "match": {
    "prefect.resource.id": "prefect.flow-run.*"
  },
  "match_related": {
    "prefect.resource.id": [
      "prefect.tag.pagerduty_automation:pd-sq-sqprefect-tests-stage",
      "prefect.tag.pagerduty_automation:pd-sq-sqprefect-tests-prod"
    ],
    "prefect.resource.role": "tag"
  },
  "after": [
    "prefect.flow-run.Pending"
  ],
  "expect": [
    "prefect.flow-run.*"
  ],
  "for_each": [
    "prefect.resource.id"
  ],
  "posture": "Proactive",
  "threshold": 5,
  "within": 3600
}
This configuration will now correctly set the system to trigger an event when 5 matching events occur within the span of 3600 seconds (or 1 hour). Please adjust accordingly in your actual configuration file or trigger setup.
r
@Marvin should i use reactive or proactive? i want the trigger to fire if 5 flow runs stay stuck in Pending for more than 20 minutes in a given hour
m
Given your requirements, you should use a "Reactive" posture for your trigger. In a "Proactive" posture, the trigger fires in anticipation of events, often before they happen, whereas a "Reactive" posture responds after the events have already occurred. In your case, you want to detect if flow runs remain in a "Pending" state for more than 20 minutes, which requires observing their state after the fact. You'll want to configure your trigger to react to flow runs that stay in the "Pending" status for too long. To specify that a trigger should check if flow runs have been "Pending" for 20 minutes, you might use a delay or a timeout mechanism, but this would not be expressed directly in the trigger configuration. Instead, the trigger configuration would detect the accumulation of "Pending" status events. Here's an example configuration with a "Reactive" posture (not a complete trigger logic for your requirements, since handling the 20-minute condition might require additional logic outside of the trigger configuration):
Copy code
json
{
  "type": "event",
  "match": {
    "prefect.resource.id": "prefect.flow-run.*"
  },
  "match_related": {
    "prefect.resource.id": [
      "prefect.tag.pagerduty_automation:pd-sq-sqprefect-tests-stage",
      "prefect.tag.pagerduty_automation:pd-sq-sqprefect-tests-prod"
    ],
    "prefect.resource.role": "tag"
  },
  "after": [
    "prefect.flow-run.Pending"
  ],
  "expect": [
    "prefect.flow-run.*"
  ],
  "for_each": [
    "prefect.resource.id"
  ],
  "posture": "Reactive",
  "threshold": 5,
  "within": 3600
}
Please note, this configuration only ensures that the trigger checks for 5 flow runs in "Pending" within an hour; it does not enforce the condition that they must be stuck for more than 20 minutes. You would likely need additional custom logic outside of this configuration, possibly with a script or a custom task in your flows, to track the duration for which flow runs have been in "Pending" status and then take action if they exceed 20 minutes.
n
hi @Robert Phamle! have you checked out the new form for making automations?
r
what new form? i have just been creating this through the web ui
n
you dont need to write JSON anymore if you dont want to, we've added a more visual form to the UI for custom triggers
r
ok i have seen this, dont mind the json really
n
👍
fwiw marvin was pretty much right, just up the
threshold
r
gotcha, but i realized that that won't work for me so back to the old way
👍 1
@Marvin rewrite so that trigger fires when the flow run stays in pending for longer than 20 minutes
m
To ensure the trigger fires when a flow run stays in the "Pending" state for longer than 20 minutes, you need a different approach from the one you had previously, as the original trigger configuration you provided does not inherently support measuring the duration a run has stayed in a particular state before transitioning. To achieve this, you would have to write custom logic, possibly within a Prefect task that periodically checks the status of flow runs and then triggers an action if the condition is met. Triggers based on durations rather than thresholds of occurrences are more complex and usually require the use of scheduled checks or polling. Here is an abstract representation of how you might structure this conditional check within a Prefect task or flow (Please note that this is pseudocode and needs to be adapted to your actual Prefect and system environment):
Copy code
python
from datetime import datetime, timedelta
import prefect
from prefect import task, Flow
from prefect.client import Client

@task
def check_pending_flows():
    client = Client()
    # Replace `my_flow_id` with your actual flow ID or use other parameters to narrow down the runs
    flow_runs = client.get_flow_runs(flow_id='my_flow_id', state='Pending')
    now = datetime.utcnow()
    pending_too_long = []

    for flow_run in flow_runs:
        flow_run_start_time = parse(flow_run.start_time)  # Replace `parse` with actual method to parse the start_time
        if now - flow_run_start_time > timedelta(minutes=20):
            pending_too_long.append(flow_run.id)
    
    if len(pending_too_long) > 0:
        # Trigger your desired event here, e.g., send a notification or mark them differently
        # This could be logging, calling an API, etc.
        print(f"Flow runs pending for too long: {pending_too_long}")

with Flow("check-pending-flows") as flow:
    check_pending_flows()

# The flow could be scheduled to run every X minutes to continuously check on the pending flow runs
# You would set up a schedule that aligns with your needs, ensuring that it runs often enough to detect
# runs pending for more than 20 minutes.
In the above example, the
check_pending_flows
task checks all flow runs with a "Pending" state for your particular flow and determines if any have been in that state for over 20 minutes. If such runs are found, you can trigger any event or notification system you have in place. Keep in mind that this is not a standard trigger configuration that can be simply adjusted in a configuration file; rather, it's a custom implementation that needs to be integrated into your Prefect workflow setup.
r
@Marvin what does the following do exactly then?
Copy code
{
  "type": "event",
  "match": {
    "prefect.resource.id": "prefect.flow-run.*"
  },
  "match_related": {
    "prefect.resource.id": [
      "prefect.tag.pagerduty_automation:pd-sq-sqprefect-tests-stage",
      "prefect.tag.pagerduty_automation:pd-sq-sqprefect-tests-prod"
    ],
    "prefect.resource.role": "tag"
  },
  "after": [
    "prefect.flow-run.Pending"
  ],
  "expect": [
    "prefect.flow-run.*"
  ],
  "for_each": [
    "prefect.resource.id"
  ],
  "posture": "Proactive",
  "threshold": 1,
  "within": 1200
}
m
The JSON snippet you've provided appears to describe a configuration for an event-based trigger mechanism. While the exact semantics can depend on the system in use (which looks like Prefect or a similar workflow orchestration tool), based on standard JSON interpretation, here's what each part would typically do: - `"type": "event"`: Specifies that this is an event-based configuration. - `"match"`: Describes criteria to match an event; it matches events related to
"prefect.flow-run.*"
(wildcard could be used to denote any flow run event). - `"match_related"`: Defines related resource identifiers that must also match for the event to be relevant. Here it's matching events related to specific tags (e.g.,
pd-sq-sqprefect-tests-stage
and
pd-sq-sqprefect-tests-prod
) that denote stages of an automation flow. - `"after"`: Specifies the initial state after which an event should be considered (in this case: when a flow run enters the "Pending" state). - `"expect"`: Specifies what kind of subsequent event is expected to occur for a flow run (e.g., any event related to a flow run). - `"for_each"`: Indicates that this trigger configuration applies to each matching
"prefect.resource.id"
. - `"posture": "Proactive"`: This defines the trigger as "Proactive". This posture means the trigger is set up to act before certain events occur, based on predictive behavior. A proactive posture might mean, for instance, that once a threshold is reached, an event will trigger in anticipation of an issue or to proactively manage resources. - `"threshold": 1`: Specifies the number of events that must occur to trigger the action, which in this case is set to 1. - `"within": 1200`: Defines the timeframe in which the threshold must be met for the trigger to fire, which is set to 1200 seconds (20 minutes). It means if one event matching the configuration occurs within 20 minutes, the trigger should fire. So, in summary, if the system observes an event where a flow run with a specific tag enters the "Pending" status and no other events occur for that flow run within 20 minutes, it's supposed to trigger some sort of action. It's proactive because it's based on the potential for an issue to occur if that one event happens and then nothing else happens within a specified timeframe. This would not express a situation where a flow run is confirmed to remain in "Pending" for over 20 minutes; rather, it’s a predictive anticipation that after a flow run becomes "Pending", if no other changes are detected within 20 minutes, there may be a problem that needs proactive handling. Given it's a "Proactive" posture and not "Reactive", the actual trigger of a problem (e.g., the run being stuck in "Pending") may not have actually happened—it's simply set up to anticipate and possibly preempt it.