<@ULVA73B9P> how to trigger a flow run using pytho...
# ask-marvin
p
@Marvin how to trigger a flow run using python sdk?
m
thought for 79.6 seconds
Based on the research, I'll show you how to trigger a flow run in Prefect 3.x using both available methods: 1. Direct Flow Run (Local Execution)
Copy code
python
from prefect import flow

@flow
def my_flow(name: str):
    print(f"Hello {name}!")

# Run the flow directly
flow_run = my_flow.run(name="Marvin")
2. Trigger Flow Run from Deployment
Copy code
python
from prefect.deployments import run_deployment
from prefect import flow

# Option 1: Run an existing deployment
flow_run = await run_deployment(
    name="your-flow/your-deployment-name",  # format: "flow-name/deployment-name"
    parameters={"name": "Marvin"},  # optional parameters
    tags=["test"],  # optional tags
)

# Option 2: Create and run a deployment
@flow
def my_flow(name: str):
    print(f"Hello {name}!")

# Create deployment
deployment = my_flow.from_source(
    source="<https://github.com/your-repo/your-project>",
    entrypoint="path/to/file.py:my_flow"
).deploy(name="my-deployment")

# Run the deployment
flow_run = await run_deployment(
    name=f"{my_flow.name}/my-deployment",
    parameters={"name": "Marvin"}
)

# Optionally wait for the flow run to complete
from prefect.client.orchestration import wait_for_flow_run
final_state = await wait_for_flow_run(flow_run)
Key points to remember: 1. When using
run_deployment()
, the function is async, so you need to use
await
and run it in an async context. 2. The deployment name format is "flow-name/deployment-name" 3. You can pass parameters, tags, and other configuration options when triggering the run 4. Use
wait_for_flow_run()
if you need to wait for the flow run to complete Since you're using PostgreSQL with asyncpg driver and have workers deployed via Helm chart, the deployment-based approach (Option 2) would be more appropriate for your production environment, as it will properly utilize your worker infrastructure. Would you like me to provide more specific examples or explain any part in more detail?
p
@Marvin is it possible to trigger it externally, through API, possibly using python sdk, if not then just using raw api?
m
thought for 52.1 seconds
Let me show you both approaches to trigger flow runs externally: 1. Using Python SDK Client
Copy code
python
from prefect.client.orchestration import get_client
from prefect.client.schemas.objects import FlowRun

async def trigger_flow_run():
    # Create a client instance
    client = await get_client()
    
    # Trigger a flow run from a deployment
    flow_run = await client.create_flow_run_from_deployment(
        deployment_id="your-deployment-id",  # or deployment name
        parameters={"param1": "value1"},
        tags=["api-triggered"]
    )
    
    print(f"Created flow run with ID: {flow_run.id}")
    return flow_run

# If you need authentication
import os
os.environ["PREFECT_API_KEY"] = "your-api-key"  # if using API key auth
# OR
os.environ["PREFECT_API_URL"] = "<http://username:password@your-server:4200/api>"  # if using basic auth
2. Using Raw HTTP API
Copy code
python
import requests
import json

# If using basic auth
import base64

def create_basic_auth_header(username, password):
    credentials = f"{username}:{password}"
    encoded = base64.b64encode(credentials.encode()).decode()
    return f"Basic {encoded}"

# API endpoint
PREFECT_API_URL = "<http://your-prefect-server:4200/api>"

# Headers
headers = {
    "Content-Type": "application/json",
    # If using basic auth:
    "Authorization": create_basic_auth_header("your-username", "your-password"),
    # If using API key:
    # "Authorization": f"Bearer your-api-key"
}

# Create flow run from deployment
deployment_name = "your-flow/your-deployment"
response = <http://requests.post|requests.post>(
    f"{PREFECT_API_URL}/deployments/name/{deployment_name}/create_flow_run",
    headers=headers,
    json={
        "parameters": {"param1": "value1"},
        "tags": ["api-triggered"]
    }
)

if response.status_code == 201:
    flow_run = response.json()
    print(f"Created flow run with ID: {flow_run['id']}")
else:
    print(f"Error: {response.status_code}")
    print(response.text)
3. Using
curl
(for testing)
Copy code
bash
# Using basic auth
curl -X POST \
  "<http://your-prefect-server:4200/api/deployments/name/your-flow/your-deployment/create_flow_run>" \
  -H "Content-Type: application/json" \
  -H "Authorization: Basic $(echo -n 'username:password' | base64)" \
  -d '{"parameters": {"param1": "value1"}, "tags": ["api-triggered"]}'

# Using API key
curl -X POST \
  "<http://your-prefect-server:4200/api/deployments/name/your-flow/your-deployment/create_flow_run>" \
  -H "Content-Type: application/json" \
  -H "Authorization: Bearer your-api-key" \
  -d '{"parameters": {"param1": "value1"}, "tags": ["api-triggered"]}'
Important notes: 1. Replace
your-prefect-server:4200
with your actual Prefect server address 2. You'll need to handle authentication: - Basic auth: username/password combination - API key: if you've configured API key authentication 3. The deployment name format is "flow-name/deployment-name" 4. The response will include the flow run ID which you can use to track the run's status 5. For production use, always use HTTPS and proper authentication Since you're using PostgreSQL and have workers deployed via Helm chart, make sure your API requests are directed to your Prefect server's endpoint and that you have the proper authentication configured in your Helm deployment. Would you like me to provide more details about any specific aspect, such as authentication setup or tracking the flow run status?
p
@Marvin how to trigger a flow run this way, with python sdk, and with a specific "scheduled time"? I was able to set it to past time from UI by scheduling a flow run into past, i want to achieve it by external run
m
thought for 66.3 seconds
Let me show you how to trigger a flow run with a specific scheduled time using the Python SDK:
Copy code
python
from prefect.client.orchestration import get_client
from datetime import datetime, timezone
import asyncio

async def create_scheduled_flow_run():
    # Initialize the client
    client = await get_client()
    
    # Define the scheduled time (can be past or future)
    scheduled_time = datetime(2024, 1, 1, 12, 0, tzinfo=timezone.utc)  # Example time
    
    # Create the flow run
    flow_run = await client.create_flow_run_from_deployment(
        deployment_id="your-deployment-id",  # or deployment name like "flow-name/deployment-name"
        parameters={"param1": "value1"},
        scheduled_time=scheduled_time,
        tags=["scheduled-via-api"]
    )
    
    print(f"Created flow run with ID: {flow_run.id}")
    print(f"Scheduled for: {flow_run.scheduled_start_time}")
    return flow_run

# Run the async function
async def main():
    await create_scheduled_flow_run()

# Execute
if __name__ == "__main__":
    asyncio.run(main())
Alternative approach using datetime strings:
Copy code
python
from prefect.client.orchestration import get_client
import asyncio

async def create_scheduled_flow_run():
    client = await get_client()
    
    # ISO 8601 formatted string
    scheduled_time = "2024-01-01T12:00:00Z"  # UTC time
    
    flow_run = await client.create_flow_run_from_deployment(
        deployment_id="your-flow/your-deployment",
        scheduled_time=scheduled_time,
        parameters={"param1": "value1"},
        tags=["scheduled-via-api"]
    )
    
    print(f"Created flow run with ID: {flow_run.id}")
    print(f"Scheduled for: {flow_run.scheduled_start_time}")
    return flow_run

# Execute
if __name__ == "__main__":
    asyncio.run(create_scheduled_flow_run())
Important notes: 1. The
scheduled_time
can be provided either as: - A
datetime
object (must be timezone-aware) - An ISO 8601 formatted string (e.g., "2024-01-01T120000Z") 2. When setting a past time: - The flow run will be created with that scheduled time - The scheduler will attempt to start the run immediately since the scheduled time has already passed - This is useful for historical record-keeping or when you want to explicitly mark when a run was meant to occur 3. Best practices: - Always use UTC times to avoid timezone confusion - Make sure your datetime objects are timezone-aware - Keep in mind that very old scheduled times might be rejected by the API 4. If you need to verify the scheduled time was set correctly:
Copy code
python
# Check the flow run details after creation
flow_run_details = await client.read_flow_run(flow_run.id)
print(f"Scheduled start time: {flow_run_details.scheduled_start_time}")
Since you're using PostgreSQL with workers deployed via Helm chart, make sure your client is properly configured to connect to your Prefect server:
Copy code
python
import os

# Set these before creating the client
os.environ["PREFECT_API_URL"] = "<http://your-prefect-server:4200/api>"
# If using authentication:
os.environ["PREFECT_API_KEY"] = "your-api-key"  # if using API key auth
# OR for basic auth:
os.environ["PREFECT_API_URL"] = "<http://username:password@your-server:4200/api>"
Would you like me to provide more details about any specific aspect, such as handling different timezone scenarios or error handling?