Zivile
07/22/2025, 8:04 AMMarvin
07/22/2025, 8:04 AMMarvin
07/22/2025, 8:06 AMbash
prefect cloud webhook create \
--name "my-event-webhook" \
--description "Webhook for triggering deployment" \
--enabled
b. Using the Prefect Cloud UI:
- Navigate to the Webhooks page
- Click "Create Webhook"
- Configure the webhook details
2. Configure Deployment Triggers:
In your prefect.yaml
file, add a trigger configuration:
yaml
deployments:
- name: your-deployment-name
triggers:
- enabled: true
match:
webhook_event_name: "your-event-name" # The event name to match
parameters: {} # Optional parameters to pass to the flow run
3. Use the Webhook:
- Each webhook gets a unique URL endpoint
- Send HTTP POST requests to this URL to trigger your deployment
- The webhook will transform the HTTP request into a Prefect event
- If the event matches your trigger configuration, it will automatically start a deployment run
4. Programmatic Event Emission:
You can also emit events programmatically using the Prefect API:
python
from prefect import emit_event
await emit_event(
event="your-event-name",
resource={"prefect.resource.id": "deployment/your-deployment-id"},
payload={"key": "value"} # Optional data to pass with the event
)
Important Notes:
1. Webhook URLs are secure and unique to your workspace
2. You can rotate webhook URLs if they're ever compromised using prefect cloud webhook rotate
3. Events can carry payload data that can be used as parameters in your flow runs
4. You can enable/disable webhooks using prefect cloud webhook toggle
Best Practices:
- Keep webhook URLs secure and treat them as secrets
- Use meaningful event names that describe the trigger action
- Consider implementing validation in your flows for webhook payload data
- Monitor webhook activity in the Prefect UI to track usage and troubleshoot issues
For more detailed information, you can refer to:
- Creating Deployment Triggers
- Webhooks Concept Documentation
Would you like me to elaborate on any specific aspect of this setup, such as payload handling or specific trigger configurations?Zivile
07/22/2025, 8:29 AMMarvin
07/22/2025, 8:29 AMMarvin
07/22/2025, 8:30 AMrun_deployment()
to schedule and run deployments programmatically:
python
from prefect import get_client
from prefect.deployments import run_deployment
from datetime import datetime, timedelta
async def schedule_deployment_run():
# Create a client instance
client = get_client()
# Schedule the deployment run
flow_run = await run_deployment(
name="your-deployment-name", # Can be "deployment-name" or "flow-name/deployment-name"
scheduled_time=datetime.utcnow() + timedelta(minutes=5), # Schedule 5 minutes in the future
parameters={"key": "value"}, # Optional: parameters to pass to the flow
tags=["scheduled", "api-triggered"], # Optional: tags for the flow run
work_queue_name="default", # Optional: specify a work queue
flow_run_name="my-scheduled-run" # Optional: custom name for the flow run
)
print(f"Created flow run '{flow_run.name}' with ID: {flow_run.id}")
return flow_run
Key Parameters Explained:
- `name`: The deployment name (required). Can be either the deployment ID or the name in format "flow-name/deployment-name"
- `scheduled_time`: When to schedule the run (optional, defaults to immediate execution)
- `parameters`: Dictionary of parameters to pass to the flow (optional)
- `flow_run_name`: Custom name for the flow run (optional)
- `tags`: List of tags to attach to the flow run (optional)
- `work_queue_name`: Specify which work queue should pick up the run (optional)
- `idempotency_key`: Prevent duplicate runs with the same key (optional)
- `as_subflow`: Whether to run as a subflow of the current flow, if any (defaults to True)
Example with Different Scheduling Scenarios:
python
from prefect import get_client
from prefect.deployments import run_deployment
from datetime import datetime, timedelta
async def schedule_deployment_examples():
# Immediate execution
immediate_run = await run_deployment(
name="my-flow/my-deployment",
parameters={"data_date": "2024-01-01"}
)
# Schedule for specific time
scheduled_run = await run_deployment(
name="my-flow/my-deployment",
scheduled_time=datetime(2024, 1, 1, 12, 0, 0),
parameters={"data_date": "2024-01-01"},
tags=["scheduled"]
)
# Schedule with idempotency key to prevent duplicates
idempotent_run = await run_deployment(
name="my-flow/my-deployment",
parameters={"batch_id": "batch_123"},
idempotency_key="batch_123_processing"
)
Important Notes:
1. The function is asynchronous, so you need to use await
when calling it
2. The returned FlowRun
object contains details about the scheduled run
3. If no scheduled_time
is provided, the deployment will run immediately
4. Use idempotency_key
to prevent duplicate runs when retrying failed operations
5. The function requires appropriate permissions in your Prefect workspace
Error Handling:
python
from prefect.exceptions import PrefectException
async def safe_schedule_deployment():
try:
flow_run = await run_deployment(
name="my-flow/my-deployment",
parameters={"key": "value"}
)
return flow_run
except PrefectException as e:
print(f"Failed to schedule deployment: {e}")
raise
Would you like me to elaborate on any specific aspect of using run_deployment()
, such as error handling, parameter passing, or scheduling strategies?Zivile
07/22/2025, 8:32 AMMarvin
07/22/2025, 8:32 AMMarvin
07/22/2025, 8:33 AMRunDeployment
action class:
python
from prefect.events.actions import RunDeployment
from prefect.events import Event
from datetime import datetime, timedelta
# Create a RunDeployment action
deployment_action = RunDeployment(
data={
"deployment_id": "your-deployment-id", # or "flow-name/deployment-name"
"scheduled_time": (datetime.utcnow() + timedelta(minutes=5)).isoformat(),
"parameters": {
"param1": "value1",
"param2": "value2"
},
"tags": ["scheduled", "event-triggered"],
"flow_run_name": "my-scheduled-run"
}
)
# Example of using it with an event
async def handle_event(event: Event):
await deployment_action.run(event)
You can also set up automation rules in your prefect.yaml
file to use `RunDeployment`:
yaml
automations:
- name: "Run deployment on event"
match:
event_name: "my-trigger-event"
actions:
- type: "prefect.events.actions.RunDeployment"
data:
deployment_id: "your-flow/your-deployment"
parameters:
key: "value"
tags: ["event-triggered"]
Important Configuration Options:
1. Basic Configuration:
python
action = RunDeployment(
data={
"deployment_id": "flow-name/deployment-name"
}
)
2. With Scheduling:
python
action = RunDeployment(
data={
"deployment_id": "flow-name/deployment-name",
"scheduled_time": datetime.utcnow().isoformat(),
}
)
3. With Parameters and Tags:
python
action = RunDeployment(
data={
"deployment_id": "flow-name/deployment-name",
"parameters": {"key": "value"},
"tags": ["automated", "event-triggered"],
"flow_run_name": "custom-run-name"
}
)
4. Integration with Event Handling:
python
from prefect.events import Event
from prefect.events.actions import RunDeployment
async def event_handler():
# Create the action
action = RunDeployment(
data={
"deployment_id": "flow-name/deployment-name",
"parameters": {
"event_time": datetime.utcnow().isoformat()
}
}
)
# Create an event
event = Event(
event="my-trigger-event",
resource={"prefect.resource.id": "deployment/your-deployment-id"},
payload={"some": "data"}
)
# Run the action
await action.run(event)
Best Practices:
1. Always include error handling:
python
from prefect.exceptions import PrefectException
async def safe_run_deployment_action(event: Event):
try:
action = RunDeployment(data={"deployment_id": "flow/deployment"})
await action.run(event)
except PrefectException as e:
print(f"Failed to run deployment action: {e}")
raise
2. Use meaningful tags and names for better observability
3. Consider using environment variables for deployment IDs
4. Include relevant metadata in the parameters
Would you like me to elaborate on any specific aspect of using RunDeployment
, such as more complex configuration options or integration with other Prefect features?Zivile
07/22/2025, 8:39 AMMarvin
07/22/2025, 8:39 AMMarvin
07/22/2025, 8:40 AMpython
from prefect.events.actions import RunDeployment
from prefect.events import Event
# Create a RunDeployment action
action = RunDeployment(
deployment="your-flow/your-deployment-name" # The name or ID of your deployment
)
# Example usage in an event handler
async def handle_event(event: Event):
await action.run(event)
You can also configure this in your prefect.yaml
for automations:
yaml
automations:
- name: "Run deployment on event"
match:
event_name: "my-trigger-event"
actions:
- type: "prefect.events.actions.RunDeployment"
deployment: "your-flow/your-deployment-name"
However, I notice that the documentation and available information about RunDeployment
is limited. For more reliable and well-documented ways to schedule deployments, I recommend using one of these approaches:
1. Using the Prefect API directly:
python
from prefect import get_client
from prefect.deployments import run_deployment
from datetime import datetime, timedelta
async def schedule_deployment():
return await run_deployment(
name="your-flow/your-deployment-name",
scheduled_time=datetime.utcnow() + timedelta(minutes=5),
parameters={"key": "value"}
)
2. Using Schedules in deployment configuration:
python
from prefect import flow
from prefect.deployments import Deployment
from prefect.server.schemas.schedules import CronSchedule
@flow
def my_flow():
print("Flow running!")
deployment = Deployment.build_from_flow(
flow=my_flow,
name="scheduled-deployment",
schedule=CronSchedule(cron="0 0 * * *") # Run daily at midnight
)
Would you like me to provide more details about any of these scheduling approaches? Also, if you have a specific use case in mind for RunDeployment
, please let me know and I can help find the most appropriate solution for your needs.