Piotr
07/25/2025, 8:33 AMMarvin
07/25/2025, 8:34 AMMarvin
07/25/2025, 8:35 AMpython
from prefect import flow
@flow
def my_flow(name: str):
print(f"Hello {name}!")
# Run the flow directly
flow_run = my_flow.run(name="Marvin")
2. Trigger Flow Run from Deployment
python
from prefect.deployments import run_deployment
from prefect import flow
# Option 1: Run an existing deployment
flow_run = await run_deployment(
name="your-flow/your-deployment-name", # format: "flow-name/deployment-name"
parameters={"name": "Marvin"}, # optional parameters
tags=["test"], # optional tags
)
# Option 2: Create and run a deployment
@flow
def my_flow(name: str):
print(f"Hello {name}!")
# Create deployment
deployment = my_flow.from_source(
source="<https://github.com/your-repo/your-project>",
entrypoint="path/to/file.py:my_flow"
).deploy(name="my-deployment")
# Run the deployment
flow_run = await run_deployment(
name=f"{my_flow.name}/my-deployment",
parameters={"name": "Marvin"}
)
# Optionally wait for the flow run to complete
from prefect.client.orchestration import wait_for_flow_run
final_state = await wait_for_flow_run(flow_run)
Key points to remember:
1. When using run_deployment(), the function is async, so you need to use await and run it in an async context.
2. The deployment name format is "flow-name/deployment-name"
3. You can pass parameters, tags, and other configuration options when triggering the run
4. Use wait_for_flow_run() if you need to wait for the flow run to complete
Since you're using PostgreSQL with asyncpg driver and have workers deployed via Helm chart, the deployment-based approach (Option 2) would be more appropriate for your production environment, as it will properly utilize your worker infrastructure.
Would you like me to provide more specific examples or explain any part in more detail?Piotr
07/25/2025, 8:57 AMMarvin
07/25/2025, 8:57 AMMarvin
07/25/2025, 8:58 AMpython
from prefect.client.orchestration import get_client
from prefect.client.schemas.objects import FlowRun
async def trigger_flow_run():
# Create a client instance
client = await get_client()
# Trigger a flow run from a deployment
flow_run = await client.create_flow_run_from_deployment(
deployment_id="your-deployment-id", # or deployment name
parameters={"param1": "value1"},
tags=["api-triggered"]
)
print(f"Created flow run with ID: {flow_run.id}")
return flow_run
# If you need authentication
import os
os.environ["PREFECT_API_KEY"] = "your-api-key" # if using API key auth
# OR
os.environ["PREFECT_API_URL"] = "<http://username:password@your-server:4200/api>" # if using basic auth
2. Using Raw HTTP API
python
import requests
import json
# If using basic auth
import base64
def create_basic_auth_header(username, password):
credentials = f"{username}:{password}"
encoded = base64.b64encode(credentials.encode()).decode()
return f"Basic {encoded}"
# API endpoint
PREFECT_API_URL = "<http://your-prefect-server:4200/api>"
# Headers
headers = {
"Content-Type": "application/json",
# If using basic auth:
"Authorization": create_basic_auth_header("your-username", "your-password"),
# If using API key:
# "Authorization": f"Bearer your-api-key"
}
# Create flow run from deployment
deployment_name = "your-flow/your-deployment"
response = <http://requests.post|requests.post>(
f"{PREFECT_API_URL}/deployments/name/{deployment_name}/create_flow_run",
headers=headers,
json={
"parameters": {"param1": "value1"},
"tags": ["api-triggered"]
}
)
if response.status_code == 201:
flow_run = response.json()
print(f"Created flow run with ID: {flow_run['id']}")
else:
print(f"Error: {response.status_code}")
print(response.text)
3. Using curl (for testing)
bash
# Using basic auth
curl -X POST \
"<http://your-prefect-server:4200/api/deployments/name/your-flow/your-deployment/create_flow_run>" \
-H "Content-Type: application/json" \
-H "Authorization: Basic $(echo -n 'username:password' | base64)" \
-d '{"parameters": {"param1": "value1"}, "tags": ["api-triggered"]}'
# Using API key
curl -X POST \
"<http://your-prefect-server:4200/api/deployments/name/your-flow/your-deployment/create_flow_run>" \
-H "Content-Type: application/json" \
-H "Authorization: Bearer your-api-key" \
-d '{"parameters": {"param1": "value1"}, "tags": ["api-triggered"]}'
Important notes:
1. Replace your-prefect-server:4200 with your actual Prefect server address
2. You'll need to handle authentication:
- Basic auth: username/password combination
- API key: if you've configured API key authentication
3. The deployment name format is "flow-name/deployment-name"
4. The response will include the flow run ID which you can use to track the run's status
5. For production use, always use HTTPS and proper authentication
Since you're using PostgreSQL and have workers deployed via Helm chart, make sure your API requests are directed to your Prefect server's endpoint and that you have the proper authentication configured in your Helm deployment.
Would you like me to provide more details about any specific aspect, such as authentication setup or tracking the flow run status?Piotr
07/25/2025, 9:03 AMMarvin
07/25/2025, 9:03 AMMarvin
07/25/2025, 9:04 AMpython
from prefect.client.orchestration import get_client
from datetime import datetime, timezone
import asyncio
async def create_scheduled_flow_run():
# Initialize the client
client = await get_client()
# Define the scheduled time (can be past or future)
scheduled_time = datetime(2024, 1, 1, 12, 0, tzinfo=timezone.utc) # Example time
# Create the flow run
flow_run = await client.create_flow_run_from_deployment(
deployment_id="your-deployment-id", # or deployment name like "flow-name/deployment-name"
parameters={"param1": "value1"},
scheduled_time=scheduled_time,
tags=["scheduled-via-api"]
)
print(f"Created flow run with ID: {flow_run.id}")
print(f"Scheduled for: {flow_run.scheduled_start_time}")
return flow_run
# Run the async function
async def main():
await create_scheduled_flow_run()
# Execute
if __name__ == "__main__":
asyncio.run(main())
Alternative approach using datetime strings:
python
from prefect.client.orchestration import get_client
import asyncio
async def create_scheduled_flow_run():
client = await get_client()
# ISO 8601 formatted string
scheduled_time = "2024-01-01T12:00:00Z" # UTC time
flow_run = await client.create_flow_run_from_deployment(
deployment_id="your-flow/your-deployment",
scheduled_time=scheduled_time,
parameters={"param1": "value1"},
tags=["scheduled-via-api"]
)
print(f"Created flow run with ID: {flow_run.id}")
print(f"Scheduled for: {flow_run.scheduled_start_time}")
return flow_run
# Execute
if __name__ == "__main__":
asyncio.run(create_scheduled_flow_run())
Important notes:
1. The scheduled_time can be provided either as:
- A datetime object (must be timezone-aware)
- An ISO 8601 formatted string (e.g., "2024-01-01T120000Z")
2. When setting a past time:
- The flow run will be created with that scheduled time
- The scheduler will attempt to start the run immediately since the scheduled time has already passed
- This is useful for historical record-keeping or when you want to explicitly mark when a run was meant to occur
3. Best practices:
- Always use UTC times to avoid timezone confusion
- Make sure your datetime objects are timezone-aware
- Keep in mind that very old scheduled times might be rejected by the API
4. If you need to verify the scheduled time was set correctly:
python
# Check the flow run details after creation
flow_run_details = await client.read_flow_run(flow_run.id)
print(f"Scheduled start time: {flow_run_details.scheduled_start_time}")
Since you're using PostgreSQL with workers deployed via Helm chart, make sure your client is properly configured to connect to your Prefect server:
python
import os
# Set these before creating the client
os.environ["PREFECT_API_URL"] = "<http://your-prefect-server:4200/api>"
# If using authentication:
os.environ["PREFECT_API_KEY"] = "your-api-key" # if using API key auth
# OR for basic auth:
os.environ["PREFECT_API_URL"] = "<http://username:password@your-server:4200/api>"
Would you like me to provide more details about any specific aspect, such as handling different timezone scenarios or error handling?