Ritesh
07/15/2025, 7:43 AMMarvin
07/15/2025, 7:43 AMMarvin
07/15/2025, 7:44 AMpython
from prefect import flow
@flow
async def my_async_flow():
result = await my_async_task()
return result
@flow
async def my_async_task():
# Your async code here
await some_async_operation()
return "Done!"
Key points about using async with Prefect:
1. Direct Async Support: You can simply add the async
keyword to your flow definition and Prefect will handle it appropriately.
2. Mixing Sync and Async: You can mix synchronous and asynchronous tasks within the same flow. Prefect will handle the execution context appropriately.
3. Async Task Runners: Prefect uses AsyncTaskRunEngine
under the hood to manage async task execution efficiently.
Here's a more practical example showing how to mix async and sync tasks:
python
from prefect import flow, task
import asyncio
@task
async def fetch_data():
await asyncio.sleep(1) # Simulating an async API call
return "data"
@task
def process_data(data: str):
return f"processed {data}"
@flow
async def main_flow():
# Fetch data asynchronously
data = await fetch_data()
# Process data (sync operation)
result = process_data(data)
return result
Best Practices:
1. Use async functions when dealing with I/O-bound operations (like API calls, database operations)
2. Keep CPU-bound operations in synchronous functions
3. Remember that all async functions must be awaited when called
For more detailed information, you can check out:
- The Prefect async architecture blog post: https://www.prefect.io/blog/oss-love-letters-how-anyio-powers-prefects-async-architecture
- The flows documentation: https://docs-3.prefect.io/v3/concepts/flows
- The async utilities API reference: https://docs-3.prefect.io/v3/api-ref/python/prefect-utilities-asyncutils
Is there anything specific about async flows you'd like me to clarify further?Ritesh
07/15/2025, 7:46 AMMarvin
07/15/2025, 7:46 AMMarvin
07/15/2025, 7:47 AMbash
# Create a work pool (if you don't have one)
prefect work-pool create "your-work-pool" --type process
# Deploy your flow
python create_deployment.py
This approach has several benefits:
1. Your original deployment.py
remains unchanged
2. The wrapper provides Prefect observability and scheduling
3. You can add error handling and logging in the wrapper without touching the original script
4. The subprocess approach ensures proper isolation of the script execution
To customize this further, you can:
1. Add environment variables:
python
@flow(name="run_deployment_script")
def run_deployment_script():
env = os.environ.copy()
env["CUSTOM_VAR"] = "value"
result = subprocess.run([sys.executable, "deployment.py"],
env=env,
capture_output=True,
text=True)
2. Add parameters:
python
@flow(name="run_deployment_script")
def run_deployment_script(script_args: list = None):
cmd = [sys.executable, "deployment.py"]
if script_args:
cmd.extend(script_args)
result = subprocess.run(cmd, capture_output=True, text=True)
3. Add retries or other Prefect features:
python
from prefect import flow, task
from prefect.tasks import task_input_hash
from datetime import timedelta
@task(retries=3, retry_delay_seconds=60,
cache_key_fn=task_input_hash,
cache_expiration=timedelta(hours=1))
def execute_script():
result = subprocess.run([sys.executable, "deployment.py"],
capture_output=True,
text=True)
return result.stdout
@flow(name="run_deployment_script")
def run_deployment_script():
return execute_script()
Would you like me to explain any part of this setup in more detail or help you with a specific scheduling pattern?Ritesh
07/15/2025, 8:29 AMMarvin
07/15/2025, 8:29 AMMarvin
07/15/2025, 8:30 AMpython
from prefect.client.schemas.schedules import IntervalSchedule
from prefect.client import get_client
from prefect.deployments import Deployment
from prefect.infrastructure import Process
from prefect.work_pools import WorkPool, ProcessWorker
from prefect_wrapper import run_deployment_script # This is the flow we created earlier
import asyncio
async def create_deployment():
# 1. Create a work pool programmatically
async with get_client() as client:
try:
work_pool = await client.create_work_pool(
name="my-process-pool",
work_pool_type="process",
base_job_template={},
)
print(f"Created work pool: {work_pool.name}")
except Exception as e:
# Work pool might already exist
print(f"Work pool might already exist: {e}")
# 2. Create a deployment
deployment = await Deployment.build_from_flow(
flow=run_deployment_script,
name="scheduled-script-deployment",
work_pool_name="my-process-pool",
schedule=IntervalSchedule(interval=3600), # Run every hour (3600 seconds)
path="./prefect_wrapper.py", # Path to your wrapper script
entrypoint="prefect_wrapper:run_deployment_script" # Module path to your flow
)
# 3. Apply the deployment
deployment_id = await deployment.apply()
print(f"Created deployment with ID: {deployment_id}")
if __name__ == "__main__":
asyncio.run(create_deployment())
And here's the complete setup with all necessary files:
1. Your original deployment.py
(unchanged)
2. The wrapper flow (prefect_wrapper.py
):
python
from prefect import flow
import subprocess
import sys
from pathlib import Path
@flow(name="run_deployment_script")
def run_deployment_script():
# Get the directory of the current script
current_dir = Path(__file__).parent
script_path = current_dir / "deployment.py"
# Run the script using subprocess
result = subprocess.run([sys.executable, str(script_path)],
capture_output=True,
text=True)
if result.returncode != 0:
raise Exception(f"Script failed with error: {result.stderr}")
print(result.stdout) # This will be captured in Prefect logs
return result.stdout
if __name__ == "__main__":
run_deployment_script()
3. A more advanced version of setup_deployment.py
with additional options:
```python
from prefect.client.schemas.schedules import IntervalSchedule, CronSchedule
from prefect.client import get_client
from prefect.deployments import Deployment
from prefect.infrastructure import Process
from prefect.work_pools import WorkPool, ProcessWorker
from prefect_wrapper import run_deployment_script
import asyncio
async def create_deployment(
work_pool_name: str = "my-process-pool",
deployment_name: str = "scheduled-script-deployment",
schedule_interval: int = 3600, # in seconds
tags: list = None
):
async with get_client() as client:
# 1. Create or get work pool
try:
work_pool = await client.create_work_pool(
name=work_pool_name,
work_pool_type="process",
base_job_template={},
)
print(f"Created work pool: {work_pool.name}")
except Exception as e:
print(f"Using existing work pool: {work_pool_name}")
# 2. Create deployment with more options
deployment = await Deployment.build_from_flow(
flow=run_deployment_script,
name=deployment_name,
work_pool_name=work_pool_name,
schedule=IntervalSchedule(interval=schedule_interval),
tags=tags or [],
path="./prefect_wrapper.py",Marvin
07/15/2025, 8:30 AMTo use this setup:
1. Make sure all files are in the same directory
2. Run the setup script:
```bash
python setup_deployment.py
This will:
- Create a work pool (if it doesn't exist)
- Create a deployment with the specified schedule
- Set up everything needed to run your original script
You can customize the setup by:
1. Changing the schedule:
python
# For a cron schedule instead of interval
from prefect.client.schemas.schedules import CronSchedule
# In the create_deployment function:
schedule=CronSchedule(cron="0 * * * *") # Run at the start of every hour
2. Adding parameters:
python
# If your script needs parameters
deployment = await Deployment.build_from_flow(
# ... other parameters ...
parameters={
"env": "production",
"debug": True
}
)
3. Adding infrastructure configurations:
python
# For more specific process configurations
process_infrastructure = Process(
env={"CUSTOM_VAR": "value"},
working_dir="/path/to/working/dir"
)
deployment = await Deployment.build_from_flow(
# ... other parameters ...
infrastructure=process_infrastructure
)
The advantage of this approach is that:
1. Everything is managed through code
2. You can version control your deployment configuration
3. You can easily modify and maintain the deployment settings
4. You can create multiple deployments with different configurations
5. You can add error handling and logging
Would you like me to explain any part of this setup in more detail or help you customize it for your specific needs?Ritesh
07/15/2025, 8:31 AMMarvin
07/15/2025, 8:31 AMMarvin
07/15/2025, 8:31 AMpython
from prefect import flow, task
from prefect.tasks import task_input_hash
from datetime import timedelta
import subprocess
import sys
import os
@task(retries=3, retry_delay_seconds=60,
cache_key_fn=task_input_hash,
cache_expiration=timedelta(hours=1))
def execute_script(env_vars: dict = None):
# Prepare environment variables
script_env = os.environ.copy() # Start with current environment
if env_vars:
script_env.update(env_vars)
result = subprocess.run(
[sys.executable, "deployment.py"],
capture_output=True,
text=True,
env=script_env
)
if result.returncode != 0:
raise Exception(f"Script failed with error: {result.stderr}")
return result.stdout
@flow(name="run_deployment_script")
def run_deployment_script(env_vars: dict = None):
return execute_script(env_vars=env_vars)
# Example deployment setup
if __name__ == "__main__":
from prefect.deployments import Deployment
from prefect.filesystems import GitHub
# Create GitHub block for source code
github_block = GitHub(
repository="<https://github.com/your-username/your-repo>",
reference="main", # or specific branch/tag
include_git_objects=False
)
# Example environment variables
deployment_env_vars = {
"DATABASE_URL": "<postgresql://user:pass@localhost:5432/db>",
"API_KEY": "your-api-key",
"ENV": "production"
}
# Create deployment
deployment = Deployment.build_from_flow(
flow=run_deployment_script,
name="git-based-deployment",
work_pool_name="your-work-pool",
path="path/to/your/script.py", # Path within your repository
storage=github_block,
parameters={"env_vars": deployment_env_vars}
)
deployment.apply()
For a more complete setup, here's a separate deployment configuration file (create_deployment.py
) that handles everything programmatically:
```python
from prefect.client.schemas.schedules import IntervalSchedule
from prefect.client import get_client
from prefect.deployments import Deployment
from prefect.filesystems import GitHub
from your_script import run_deployment_script # Import your flow
import asyncio
async def create_deployment():
# 1. Create GitHub block
github_block = GitHub(
name="my-github-block",
repository="https://github.com/your-username/your-repo",
reference="main", # or your specific branch/tag
include_git_objects=False,
# Add authentication if needed:
# credentials={"access_token": "your-github-token"}
)
# Save the GitHub block
await github_block.save("my-github-block")
# 2. Define environment variables
env_vars = {
"DATABASE_URL": "postgresql://user:pass@localhost:5432/db",
"API_KEY": "your-api-key",
"ENV": "production",
# Add any other environment variables you need
}
# 3. Create work pool if needed
async with get_client() as client:
try:
work_pool = await client.create_work_pool(
name="my-process-pool",
work_pool_type="process",
base_job_template={},
)
print(f"Created work pool: {work_pool.name}")
except Exception as e:
print(f"Work pool might already exist: {e}")
# 4. Create deployment
deployment = await Deployment.build_from_flow(
flow=run_deployment_script,
name="git-based-deployment",
work_pool_name="my-process-pool",
path="/path/to/your/script.py", # Path within your repository
entrypoint="your_script:run_deployment_script", # Module path to your flow
storage=github_block,
parameters={
"env_vars": env_vars
},Marvin
07/15/2025, 8:31 AMTo use this setup:
1. Replace the placeholder values:
- Update the GitHub repository URL
- Set your actual environment variables
- Adjust the path and entrypoint to match your repository structure
2. If your repository is private, you'll need to add GitHub authentication:
```python
github_block = GitHub(
repository="<https://github.com/your-username/your-repo>",
reference="main",
include_git_objects=False,
credentials={"access_token": "your-github-token"} # Add your GitHub token
)
3. You can also store sensitive environment variables using Prefect blocks:
python
from prefect.blocks.system import Secret
# Create and save a secret
secret_block = Secret(value="your-secret-value")
await secret_block.save("my-secret-name")
# Later, in your flow:
@flow
def run_deployment_script():
secret = Secret.load("my-secret-name")
env_vars = {
"API_KEY": secret.value,
# other env vars...
}
return execute_script(env_vars=env_vars)
4. Run the deployment setup:
bash
python create_deployment.py
This enhanced version provides:
- Git-based source code management
- Environment variable handling
- Secure secrets management
- Scheduled execution
- Error handling and retries
- Caching of results
Would you like me to explain any part of this setup in more detail or help you customize it further for your specific needs?Ritesh
07/15/2025, 8:41 AMMarvin
07/15/2025, 8:42 AMMarvin
07/15/2025, 8:43 AM.serve()
method is supported in Prefect 3.4.4, but it's important to note that the recommended approach for deployments in Prefect 3.x is to use .deploy()
instead. Here's a comparison of both approaches:
1. Using .deploy()
(Recommended approach):
python
from prefect import flow
@flow
def my_flow():
print("Hello from my flow!")
if __name__ == "__main__":
my_flow.from_source(
source="./my_flow.py",
name="my-deployment"
).deploy(
name="my-deployment",
work_pool_name="my-work-pool",
interval=3600 # optional: schedule interval in seconds
)
2. Using `.serve()`:
python
from prefect import flow
from prefect.deployments import Deployment
@flow
def my_flow():
print("Hello from my flow!")
if __name__ == "__main__":
deployment = Deployment.build_from_flow(
flow=my_flow,
name="my-deployment",
work_pool_name="my-work-pool"
)
deployment.serve(
limit_concurrent_runs=2, # optional: limit concurrent runs
pause_on_shutdown=True # optional: pause deployments on shutdown
)
Important considerations:
1. .deploy()
is the newer, recommended approach because it:
- Is more straightforward to use
- Integrates better with work pools
- Provides better infrastructure management
- Has simpler syntax
2. .serve()
is still supported but has some limitations:
- Requires more boilerplate code
- May not support all newer features
- Might be deprecated in future versions
If you're starting a new project with Prefect 3.4.4, I recommend using the .deploy()
method instead of .serve()
. Here's a complete example using the recommended approach:
python
from prefect import flow
from datetime import timedelta
@flow(retries=3, retry_delay_seconds=60)
def my_flow(param1: str = "default"):
print(f"Running with param1={param1}")
if __name__ == "__main__":
my_flow.from_source(
source="./my_flow.py", # or your GitHub URL
name="my-deployment"
).deploy(
name="production-deployment",
work_pool_name="my-work-pool",
interval=timedelta(hours=1),
parameters={"param1": "production"},
tags=["production"],
description="Production deployment of my flow"
)
Would you like me to provide more details about specific deployment configurations or help you migrate from .serve()
to .deploy()
?