Sanjeev
12/28/2023, 1:47 PMMarvin
12/28/2023, 1:48 PMyaml
flow.py:
decorated_fn:
work_pool:
name: my-process-pool
triggers:
- enabled: true
match:
prefect.resource.id: my.external.resource
expect:
- external.resource.pinged
parameters:
param_1: "{{ event }}"
At deployment time, this will create a linked automation that is triggered by events matching your chosen grammar, which will pass the templatable event as a parameter to the deployment's flow run.
Additionally, you can use the --trigger
argument with prefect deploy
to pass either a JSON string or a path to a .yaml
or .json
file that describes the trigger configuration.
For example, passing a trigger as a JSON string:
shell
prefect deploy -n your-deployment-name --path path/to/flow.py --trigger '{"enabled":true,"match":{"prefect.resource.id":"my.external.resource"},"expect":["external.resource.pinged"],"parameters":{"param_1":"{{ event }}"} }'
When defining triggers, the parameters
section can be written to include templated references to the event data, such as "{{ event }}"
, which will take the CloudEvent's data
attribute from the event payload.
For more detailed information and specific use cases, please refer to the Prefect documentation on webhooks and automations.Sanjeev
12/28/2023, 2:46 PMevent_payload ={
"s3_folder":"folder_name",
"s3_path":"path_to_s3"
}
emit_event(event="flow1_completion",
resource={"prefect.resource.id": f"coder.{name}"},
payload=event_payload)
I have also designed a deployment where I mention this event as the trigger:
def flow_2(s3_folder, s3_path) -> None:
"""Triggering Flow 2"""
log = get_run_logger()
log.info("Triggering Flow 2")
log.info(print(s3_folder))
log.info(print(s3_path))
trigger_on_complete = DeploymentTrigger(
name="Wait for Flow1 Upstream deployment",
enabled=True,
match_related={
"prefect.resource.name": "my-first-deployment",
},
expect={"flow1_completion"},
parameters={
"event_param": "{{flow1_completion}}",
} # what events to wait for
)
if __name__ == "__main__":
# create your first deployment
# repo_info.serve(name="my-first-deployment")
flow_2.serve(name="flow2-deployment", triggers=[trigger_on_complete])
How do I modify this code to access the event_payload
variables s3_folder
& s3_path
from flow_2?Taylor Curran
12/28/2023, 4:01 PMIn particular, we need to specify the dynamic values from our webhook event in JSON using Jinja2 templating syntax. Here’s what we want to input on the JSON tab.
Taylor Curran
12/28/2023, 5:24 PMTaylor Curran
12/28/2023, 5:24 PMSanjeev
12/28/2023, 5:24 PMSanjeev
12/29/2023, 3:19 PMtrigger_on_complete = DeploymentTrigger(
name="Wait for Flow1 Upstream deployment",
enabled=True,
match_related={
"prefect.resource.name": "my-first-deployment",
},
expect={"flow1_completion"},
parameters={
"s3_folder": "{{flow1_completion.payload.s3_folder}}",
"s3_path": "{{flow1_completion.payload.s3_path}}"
} # what events to wait for
)
Even this isn't working. Any luck on your side?Taylor Curran
12/29/2023, 5:31 PMfrom prefect.events import DeploymentTrigger
doesn’t seem to be working for meSanjeev
12/29/2023, 6:10 PMimport httpx
from prefect import flow, task
from prefect.events import emit_event
Flow 2:
from prefect import flow, get_run_logger
from prefect.events.schemas import DeploymentTrigger
Sanjeev
12/29/2023, 6:11 PMTaylor Curran
12/29/2023, 7:26 PM.
when you emit events like so:
@flow
def emit_event_flow(s3_folder: str, s3_path: str) -> None:
"""Emitting Event"""
event_payload = {"s3_folder": "folder_name", "s3_path": "path_to_s3"}
name = "taylor"
emit_event(
event="flow1_completion.123",
resource={"prefect.resource.id": f"coder.{name}"},
payload=event_payload,
)
Sanjeev
12/29/2023, 7:32 PMtrigger_on_complete
code remain unchaged?Taylor Curran
12/29/2023, 9:14 PMfrom prefect import flow
from prefect.events import emit_event
@flow
def emit_event_flow(s3_folder: str, s3_path: str) -> None:
"""Emitting Event"""
event_payload = {"s3_folder": "folder_name", "s3_path": "path_to_s3"}
name = "taylor"
emit_event(
event="flow1_completion.123",
resource={"prefect.resource.id": f"coder.{name}"},
payload=event_payload,
)
if __name__ == "__main__":
emit_event_flow.serve(
"my-first-deployment")
from prefect import flow, get_run_logger
from prefect.events.schemas import DeploymentTrigger
@flow
def flow_2(s3_folder, s3_path) -> None:
"""Triggering Flow 2"""
log = get_run_logger()
<http://log.info|log.info>("Triggering Flow 2")
<http://log.info|log.info>(print(s3_folder))
<http://log.info|log.info>(print(s3_path))
trigger_on_complete = DeploymentTrigger(
name="Wait for Flow1 Upstream deployment",
enabled=True,
match_related={
"prefect.resource.name": "my-first-deployment",
},
expect={"flow1_completion.*"},
parameters={
"s3_folder": "{{event.payload.s3_folder}}",
"s3_path": "{{event.payload.s3_path}}",
}, # what events to wait for
)
if __name__ == "__main__":
# create your first deployment
# repo_info.serve(name="my-first-deployment")
flow_2.serve(name="flow2-deployment", triggers=[trigger_on_complete])
Sanjeev
12/29/2023, 9:15 PMTaylor Curran
12/29/2023, 9:16 PMevent.payload.s3_path
not `flow1_completionTaylor Curran
12/29/2023, 9:16 PMevent
is just a generic name for whatever the event was that satisfied the conditions of the triggerTaylor Curran
12/29/2023, 9:17 PMSanjeev
12/29/2023, 9:17 PMTaylor Curran
12/29/2023, 9:18 PMSanjeev
12/29/2023, 9:19 PM