<@ULVA73B9P> How do I pass a event payload as a tr...
# ask-marvin
s
@Marvin How do I pass a event payload as a trigger parameter for a deployment?
m
To pass an event payload as a trigger parameter for a deployment in Prefect 2.x, you can use an event trigger in your flow's YAML file. Below is how you can define the event trigger with the parameters section, which allows you to template the event payload:
Copy code
yaml
flow.py:
  decorated_fn:
  work_pool:
    name: my-process-pool
triggers:
  - enabled: true
    match:
      prefect.resource.id: my.external.resource
    expect:
      - external.resource.pinged
    parameters:
      param_1: "{{ event }}"
At deployment time, this will create a linked automation that is triggered by events matching your chosen grammar, which will pass the templatable event as a parameter to the deployment's flow run. Additionally, you can use the
--trigger
argument with
prefect deploy
to pass either a JSON string or a path to a
.yaml
or
.json
file that describes the trigger configuration. For example, passing a trigger as a JSON string:
Copy code
shell
prefect deploy -n your-deployment-name --path path/to/flow.py --trigger '{"enabled":true,"match":{"prefect.resource.id":"my.external.resource"},"expect":["external.resource.pinged"],"parameters":{"param_1":"{{ event }}"} }'
When defining triggers, the
parameters
section can be written to include templated references to the event data, such as
"{{ event }}"
, which will take the CloudEvent's
data
attribute from the event payload. For more detailed information and specific use cases, please refer to the Prefect documentation on webhooks and automations.
s
@Nate/@Taylor Curran Need some help with this. I'm creating the following event.
Copy code
event_payload ={
        "s3_folder":"folder_name",
        "s3_path":"path_to_s3"
    }
    emit_event(event="flow1_completion",
               resource={"prefect.resource.id": f"coder.{name}"},
               payload=event_payload)
I have also designed a deployment where I mention this event as the trigger:
Copy code
def flow_2(s3_folder, s3_path) -> None:
    """Triggering Flow 2"""
    log = get_run_logger()
    log.info("Triggering Flow 2")
    log.info(print(s3_folder))
    log.info(print(s3_path))

trigger_on_complete = DeploymentTrigger(
    name="Wait for Flow1 Upstream deployment",
    enabled=True,
    match_related={
        "prefect.resource.name": "my-first-deployment",
        
    },
    expect={"flow1_completion"},
    parameters={
        "event_param": "{{flow1_completion}}",
                }  # what events to wait for
)

if __name__ == "__main__":
    # create your first deployment
    # repo_info.serve(name="my-first-deployment")
    flow_2.serve(name="flow2-deployment", triggers=[trigger_on_complete])
How do I modify this code to access the
event_payload
variables
s3_folder
&
s3_path
from flow_2?
👀 1
t
Hi @Sanjeev, as a start I think this blog post should help.
In particular, we need to specify the dynamic values from our webhook event in JSON using Jinja2 templating syntax. Here’s what we want to input on the JSON tab.
You’ll want to play around with what you’re dong there already, parameters: {{ event.payload.s3bucket }} but the triggering event and any data it carries in its payload will be available under the event key.
If I have time today I will try to cook up an example.
s
Thank you!!
Hey @Taylor Curran,
Copy code
trigger_on_complete = DeploymentTrigger(
    name="Wait for Flow1 Upstream deployment",
    enabled=True,
    match_related={
        "prefect.resource.name": "my-first-deployment",   
    },
    expect={"flow1_completion"},
    parameters={
        "s3_folder": "{{flow1_completion.payload.s3_folder}}",
        "s3_path": "{{flow1_completion.payload.s3_path}}"
    }  # what events to wait for
)
Even this isn't working. Any luck on your side?
👀 1
t
Hi @Sanjeev I’m working on reproducing this. Could you send me your import statements from the top of these scripts?
from prefect.events import DeploymentTrigger
doesn’t seem to be working for me
s
For Flow1:
Copy code
import httpx
from prefect import flow, task
from prefect.events import emit_event
Copy code
Flow 2:
from prefect import flow, get_run_logger
from prefect.events.schemas import DeploymentTrigger
thank you 1
We're currently using version 2.13.1
t
Thanks @Sanjeev one thing I found is that you need to add a
.
when you emit events like so:
Copy code
@flow
def emit_event_flow(s3_folder: str, s3_path: str) -> None:
    """Emitting Event"""

    event_payload = {"s3_folder": "folder_name", "s3_path": "path_to_s3"}
    name = "taylor"
    emit_event(
        event="flow1_completion.123",
        resource={"prefect.resource.id": f"coder.{name}"},
        payload=event_payload,
    )
thank you 1
s
Does the
trigger_on_complete
code remain unchaged?
t
Hi @Sanjeev I got the following scripts to work for me:
Copy code
from prefect import flow
from prefect.events import emit_event


@flow
def emit_event_flow(s3_folder: str, s3_path: str) -> None:
    """Emitting Event"""

    event_payload = {"s3_folder": "folder_name", "s3_path": "path_to_s3"}
    name = "taylor"
    emit_event(
        event="flow1_completion.123",
        resource={"prefect.resource.id": f"coder.{name}"},
        payload=event_payload,
    )


if __name__ == "__main__":
    emit_event_flow.serve(
        "my-first-deployment")
Copy code
from prefect import flow, get_run_logger
from prefect.events.schemas import DeploymentTrigger


@flow
def flow_2(s3_folder, s3_path) -> None:
    """Triggering Flow 2"""
    log = get_run_logger()
    <http://log.info|log.info>("Triggering Flow 2")
    <http://log.info|log.info>(print(s3_folder))
    <http://log.info|log.info>(print(s3_path))


trigger_on_complete = DeploymentTrigger(
    name="Wait for Flow1 Upstream deployment",
    enabled=True,
    match_related={
        "prefect.resource.name": "my-first-deployment",
    },
    expect={"flow1_completion.*"},
    parameters={
        "s3_folder": "{{event.payload.s3_folder}}",
        "s3_path": "{{event.payload.s3_path}}",
    },  # what events to wait for
)


if __name__ == "__main__":
    # create your first deployment
    # repo_info.serve(name="my-first-deployment")
    flow_2.serve(name="flow2-deployment", triggers=[trigger_on_complete])
thank you 1
s
Thank you...... I will check it out!!!!!
t
Its
event.payload.s3_path
not `flow1_completion
event
is just a generic name for whatever the event was that satisfied the conditions of the trigger
We are working on better docs for this! Thank you for bearing with us and for reaching out 🙂
🙌 1
s
So we just use event instead of the actual name of the event?
t
In the {{jinja templating}} yes
s
Ok... Let me try this and get back to you next year!!! 😄
❤️ 1
panda dancing 1
110 Views