Yu Shao Pang
07/02/2025, 9:40 AMMarvin
07/02/2025, 9:40 AMMarvin
07/02/2025, 9:41 AMpython
from prefect.deployments import Deployment
deployment = Deployment.build_from_flow(
flow=my_flow,
name="my-deployment",
...
)
2. New Way (3.x):
python
from prefect import flow
@flow
def my_flow():
...
if __name__ == "__main__":
my_flow.from_source(
source="<https://github.com/your-org/your-repo>",
entrypoint="path/to/flow.py:my_flow"
).deploy(
name="my-deployment",
work_pool_name="my-work-pool"
)
Key changes to note:
1. The entire Deployment
class has been removed. Instead, deployment functionality is now a method on the flow object itself.
2. CLI changes:
- ❌ prefect deployment build
(removed)
- ✅ prefect deploy
(new command)
3. Configuration is now primarily handled through prefect.yaml
files, which provide a cleaner way to manage deployment settings.
4. Work pools and workers replace the old agent system. You'll need to:
- Create a work pool (prefect work-pool create
)
- Start a worker (prefect worker start -p "my-work-pool"
)
For a complete migration, you'll want to:
1. Update your deployment scripts to use the new .from_source().deploy()
pattern
2. Set up appropriate work pools for your deployment targets
3. Replace any agent configurations with worker configurations
4. Update your CI/CD pipelines to use prefect deploy
instead of prefect deployment build
You can find the complete migration guide in the Prefect 3.x documentation.
Would you like me to provide more specific examples for any part of this migration process?Yu Shao Pang
07/02/2025, 9:47 AMMarvin
07/02/2025, 9:47 AMMarvin
07/02/2025, 9:48 AMname
| name
| ✅ Still used, works the same way
storage
| N/A | ❌ Removed, now handled via flow.from_source()
infrastructure
| work_pool_name
| 🔄 Replaced with work pool concept
infra_overrides
| job_variables
| 🔄 Renamed and slightly different usage
tags
| tags
| ✅ Still used, works the same way
version
| N/A | ❌ Removed, now handled automatically
schedule
| schedule
| ✅ Still used, works similarly
parameters
| parameters
| ✅ Still used, works the same way
### New 3.x Parameters
- `work_pool_name`: Specifies which work pool will run the deployment
- `image`: For container-based deployments
- `build`: Boolean to control if Docker image should be built
- `push`: Boolean to control if Docker image should be pushed
- `enforce_parameter_schema`: New option for parameter validation
### Example Comparison:
python
# 2.x (OLD, no longer works)
deployment = Deployment.build_from_flow(
flow=my_flow,
name="my-deployment",
infrastructure=DockerContainer.load("my-container"),
infra_overrides={"env": {"MY_VAR": "value"}},
tags=["prod"],
parameters={"param1": "value1"}
)
# 3.x (NEW)
my_flow.from_source(
source="<https://github.com/your-org/your-repo>",
entrypoint="path/to/flow.py:my_flow"
).deploy(
name="my-deployment",
work_pool_name="my-docker-pool",
job_variables={"env": {"MY_VAR": "value"}},
tags=["prod"],
parameters={"param1": "value1"}
)
### Important Notes:
1. Configuration Location: Many deployment settings that were previously in code are now recommended to be placed in prefect.yaml
files.
2. Infrastructure Management: Instead of infrastructure blocks, you now use:
- Work pools for infrastructure configuration
- Job variables for runtime overrides
- Work queue configuration for execution control
3. Storage Changes: Rather than using storage blocks, source code location is now specified using from_source()
, which supports:
- Git repositories
- Local files
- Remote URLs
4. Schedules: While schedules still exist, they're now typically defined in prefect.yaml
or using the UI, though they can still be specified in code.
Would you like me to elaborate on any specific parameter or provide examples of how to migrate specific deployment configurations?Yu Shao Pang
07/02/2025, 10:17 AMMarvin
07/02/2025, 10:17 AMMarvin
07/02/2025, 10:18 AMpython
# Old way (2.x)
deployment = Deployment.build_from_flow(
flow=my_flow,
name="my-deployment",
parameters={"param1": "value1"},
tags=["prod"]
)
deployment.apply()
# New way (3.x)
@flow
def my_flow(param1: str = "value1"):
...
if __name__ == "__main__":
my_flow.from_source(
source="<https://github.com/your-org/your-repo>",
entrypoint="path/to/flow.py:my_flow"
).deploy(
name="my-deployment",
tags=["prod"]
)
### 2. Loading/Accessing Deployments
python
# Old way (2.x)
deployment = Deployment.load("flow-name/deployment-name")
deployment.update()
# New way (3.x)
from prefect.client import get_client
async def get_deployment():
client = get_client()
deployment = await client.read_deployment_by_name(
"flow-name/deployment-name"
)
return deployment
### 3. Running Deployments
python
# Old way (2.x)
deployment = Deployment.load("flow-name/deployment-name")
deployment.run()
# New way (3.x)
from prefect.deployments import run_deployment
# Synchronous
run_deployment(
name="flow-name/deployment-name",
parameters={"param1": "new_value"}
)
# Asynchronous
await run_deployment(
name="flow-name/deployment-name",
parameters={"param1": "new_value"}
)
### 4. Managing Deployment Configuration
The preferred way in 3.x is to use `prefect.yaml`:
yaml
# prefect.yaml
deployments:
- name: my-deployment
entrypoint: flow.py:my_flow
work_pool:
name: my-work-pool
schedule:
interval: 3600
parameters:
param1: value1
tags:
- prod
Then deploy using CLI:
bash
prefect deploy
### 5. Programmatic Deployment Management
If you need to manage deployments programmatically:
python
# 3.x way
from prefect.client import get_client
async def manage_deployment():
client = get_client()
# List deployments
deployments = await client.read_deployments()
# Read specific deployment
deployment = await client.read_deployment_by_name("flow-name/deployment-name")
# Delete deployment
await client.delete_deployment("deployment-id")
### Key Points for Refactoring:
1. Move Configuration to YAML:
- Shift deployment configuration to prefect.yaml
files where possible
- This is the recommended approach for managing deployments in 3.x
2. Use Client API:
- For programmatic operations, use the Prefect client API
- Most operations are async by default
3. Deployment Creation:
- Use flow.from_source().deploy()
for creating deployments
- Or use the prefect deploy
CLI command with prefect.yaml
4. Running Deployments:
- Use run_deployment()
function instead of deployment object methods
- Supports both sync and async execution
5. Work Pools:
- Replace infrastructure configurations with work pool settings
- Configure work pools via UI or CLI
Would you like me to provide more specific examples for any particular deployment operations you're trying to refactor?Yu Shao Pang
07/02/2025, 10:20 AMMarvin
07/02/2025, 10:20 AMMarvin
07/02/2025, 10:21 AMpython
from prefect.client import get_client
from prefect.deployments import run_deployment
class DeploymentProxy:
def __init__(self, name, parameters=None, tags=None):
self.name = name
self.parameters = parameters or {}
self.tags = tags or []
def apply(self):
# Deploy the flow when apply is called
return self.flow.from_source(
source=".", # Assumes local source
entrypoint="path/to/your/flow.py:my_flow"
).deploy(
name=self.name,
parameters=self.parameters,
tags=self.tags
)
def build_deployment(flow, name, parameters=None, tags=None):
proxy = DeploymentProxy(
name=name,
parameters=parameters,
tags=tags
)
proxy.flow = flow
return proxy
# Your existing code stays almost the same:
deployment = build_deployment(
flow=my_flow,
name="my-deployment",
parameters={"param1": "value1"},
tags=["prod"]
)
return deployment
### Option 2: Direct Return of Deployment Name (More Pythonic for 3.x)
python
def build_deployment(flow, name, parameters=None, tags=None):
deployment_name = flow.from_source(
source=".", # Assumes local source
entrypoint="path/to/your/flow.py:my_flow"
).deploy(
name=name,
parameters=parameters or {},
tags=tags or []
)
return deployment_name
# Usage remains similar:
deployment = build_deployment(
flow=my_flow,
name="my-deployment",
parameters={"param1": "value1"},
tags=["prod"]
)
return deployment
Important Notes:
1. Option 1 maintains the most similar interface but adds complexity
2. Option 2 is simpler but changes the return type
3. Both options require you to specify the entrypoint
parameter
4. You'll need to adjust the source
parameter based on where your flow code lives
The key differences from 2.x:
- The deployment is created immediately rather than being built and applied separately
- There's no direct equivalent to the Deployment object
- You'll need to specify where your flow code lives via the source
and entrypoint
parameters
Would you like me to explain more about either option or provide examples of how to use the returned deployment in your downstream code?Yu Shao Pang
07/02/2025, 10:28 AMMarvin
07/02/2025, 10:28 AMMarvin
07/02/2025, 10:28 AMload_existing
is not a valid parameter in Prefect 3.x. The concept of loading existing deployments has been replaced with a different approach.
If you need to work with existing deployments in 3.x, you have these options:
1. Read/Load an existing deployment:
python
from prefect.client import get_client
async def get_existing_deployment(name):
client = get_client()
deployment = await client.read_deployment_by_name(name)
return deployment
2. Update an existing deployment:
python
# Using the CLI (recommended approach):
prefect deploy
# Or programmatically:
flow.from_source(
source=".",
entrypoint="path/to/flow.py:my_flow"
).deploy(
name="existing-deployment-name" # If the name exists, it will update the deployment
)
3. Check if a deployment exists:
python
from prefect.client import get_client
async def deployment_exists(name):
client = get_client()
try:
await client.read_deployment_by_name(name)
return True
except Exception:
return False
The key difference in 3.x is that deployments are more declarative - you specify what you want the deployment to look like, and Prefect handles creating or updating it accordingly. There's no need for explicit load_existing
functionality because:
1. If you deploy with an existing name, it updates the deployment
2. If you need to read an existing deployment, you use the client API
3. If you need to modify an existing deployment, you can use the client API or simply deploy again with updated parameters
Would you like me to provide more detailed examples of any of these approaches?Yu Shao Pang
07/02/2025, 10:31 AMsource
in prefect 2?Marvin
07/02/2025, 10:31 AMMarvin
07/02/2025, 10:32 AMsource
parameter in Prefect 2.x's Deployment.build_from_flow()
. Instead, Prefect 2.x used the storage
parameter with storage blocks. Here's the comparison:
python
# Prefect 2.x way
from prefect.deployments import Deployment
from prefect.filesystems import GitHub
deployment = Deployment.build_from_flow(
flow=my_flow,
name="my-deployment",
storage=GitHub.load("my-github-block"), # Used storage parameter
parameters={"param1": "value1"}
)
# Prefect 3.x way
my_flow.from_source(
source="<https://github.com/owner/repo>", # Uses source parameter
entrypoint="path/to/flow.py:my_flow"
).deploy(
name="my-deployment",
parameters={"param1": "value1"}
)
The key differences:
- 2.x used storage
parameter with storage blocks (GitHub, S3, Azure, etc.)
- 3.x uses source
parameter with direct URLs or paths
- Storage blocks are removed in 3.x in favor of the simpler source
parameter
If you're migrating code, you'll need to:
1. Remove references to storage blocks
2. Replace storage=...
with the appropriate source="..."
in the new deployment pattern
3. Add the required entrypoint
parameter to specify where your flow is located
Would you like me to provide examples of how to migrate specific storage block configurations to the new source
parameter format?Yu Shao Pang
07/02/2025, 10:33 AMMarvin
07/02/2025, 10:33 AMMarvin
07/02/2025, 10:34 AMpython
from prefect.client import get_client
async def convert_storage_to_source(block_name: str) -> str:
"""Convert a 2.x storage block to a 3.x source URL."""
client = get_client()
try:
# Get the block document
block_doc = await client.read_block_document_by_name(block_name)
# Extract information based on block type
block_type = block_doc.block_type_name
if "github" in block_type.lower():
# Extract GitHub details
data = block_doc.data
repository = data.get("repository")
reference = data.get("reference", "main")
return f"<https://github.com/{repository}@{reference}>"
elif "s3" in block_type.lower():
data = block_doc.data
bucket = data.get("bucket_name")
key = data.get("base_path", "")
return f"s3://{bucket}/{key}"
elif "azure" in block_type.lower():
data = block_doc.data
container = data.get("container")
return f"azure://{container}"
else:
return "." # Local filesystem default
except Exception as e:
print(f"Could not convert block {block_name}: {e}")
return "."
# Usage example:
async def migrate_deployment():
old_storage_block = "my-github-block"
source_url = await convert_storage_to_source(old_storage_block)
my_flow.from_source(
source=source_url,
entrypoint="path/to/flow.py:my_flow"
).deploy(
name="my-deployment"
)
Important notes:
1. This is a basic conversion - you might need to add more logic based on your specific storage block configurations
2. You'll still need to specify the entrypoint
parameter manually as this information isn't typically stored in the storage block
3. Some storage block types might require different URL formats
4. Local filesystem storage blocks typically convert to "." as the source
Common conversions:
python
# GitHub Storage Block -> Source URL
"github/myorg/myrepo" -> "<https://github.com/myorg/myrepo>"
# S3 Storage Block -> Source URL
"s3/mybucket/path" -> "<s3://mybucket/path>"
# Azure Storage Block -> Source URL
"azure/mycontainer" -> "<azure://mycontainer>"
# Local Storage Block -> Source URL
"local/path" -> "."
Would you like me to provide more specific examples for certain storage block types you're using?Yu Shao Pang
07/02/2025, 10:35 AMMarvin
07/02/2025, 10:36 AMMarvin
07/02/2025, 10:36 AMpython
from typing import Optional
from prefect.blocks.core import Block
def convert_block_to_source(storage: Optional[Block]) -> str:
"""Convert a storage block to a source string, or return "." if no storage provided."""
if storage is None:
return "."
# Get block type name
block_type = storage.block_type_name.lower()
if "github" in block_type:
# GitHub storage block
repository = getattr(storage, "repository", "")
reference = getattr(storage, "reference", "main")
return f"<https://github.com/{repository}@{reference}>"
elif "s3" in block_type:
# S3 storage block
bucket = getattr(storage, "bucket_name", "")
key = getattr(storage, "base_path", "")
return f"s3://{bucket}/{key}".rstrip('/')
elif "azure" in block_type:
# Azure storage block
container = getattr(storage, "container", "")
return f"azure://{container}"
elif "gcs" in block_type:
# Google Cloud Storage block
bucket = getattr(storage, "bucket", "")
key = getattr(storage, "base_path", "")
return f"gs://{bucket}/{key}".rstrip('/')
else:
# Default to local filesystem
return "."
# Usage example:
storage_block = None # or your actual storage block
source = convert_block_to_source(storage_block)
my_flow.from_source(
source=source,
entrypoint="path/to/flow.py:my_flow"
).deploy(
name="my-deployment"
)
The function:
1. Handles None
case by returning "." (local filesystem)
2. Uses getattr
with defaults to safely access block attributes
3. Supports common storage block types (GitHub, S3, Azure, GCS)
4. Falls back to local filesystem for unknown block types
You can use it like this:
python
# With a GitHub block
github_block = GitHub.load("my-github-block")
source = convert_block_to_source(github_block)
# Result: "<https://github.com/owner/repo@main>"
# With an S3 block
s3_block = S3.load("my-s3-block")
source = convert_block_to_source(s3_block)
# Result: "<s3://my-bucket/my-path>"
# With no block
source = convert_block_to_source(None)
# Result: "."
Would you like me to add support for any specific storage block types you're using?Yu Shao Pang
07/02/2025, 10:37 AMMarvin
07/02/2025, 10:37 AMMarvin
07/02/2025, 10:38 AMpython
from typing import Optional
from prefect.blocks.core import Block
def convert_block_to_source(storage: Optional[Block]) -> str:
"""Convert a storage block to a source string, or return "." if no storage provided."""
if storage is None:
return "."
# Get block type name
block_type = storage.block_type_name.lower()
if "gitlab" in block_type:
# GitLab storage block
repository = getattr(storage, "repository", "")
reference = getattr(storage, "reference", "main")
# GitLab URLs can be either format:
if "<http://gitlab.com|gitlab.com>" in repository:
return f"{repository}@{reference}" # If full URL is provided
else:
return f"<https://gitlab.com/{repository}@{reference}>" # If just path is provided
elif "github" in block_type:
repository = getattr(storage, "repository", "")
reference = getattr(storage, "reference", "main")
return f"<https://github.com/{repository}@{reference}>"
elif "s3" in block_type:
bucket = getattr(storage, "bucket_name", "")
key = getattr(storage, "base_path", "")
return f"s3://{bucket}/{key}".rstrip('/')
elif "azure" in block_type:
container = getattr(storage, "container", "")
return f"azure://{container}"
else:
return "."
# Usage examples:
# For GitLab:
# If your block contains "myorg/myrepo" as repository
# Result: "<https://gitlab.com/myorg/myrepo@main>"
# If your block contains "<https://gitlab.company.com/myorg/myrepo>" as repository
# Result: "<https://gitlab.company.com/myorg/myrepo@main>"
In Prefect 3.x, GitLab repositories can be used as sources in these formats:
1. <https://gitlab.com/owner/repo@branch>
2. <https://gitlab.company.com/owner/repo@branch>
(for self-hosted GitLab)
For example:
python
my_flow.from_source(
source="<https://gitlab.com/myorg/myrepo@main>",
entrypoint="path/to/flow.py:my_flow"
).deploy(
name="my-deployment"
)
If you're using a self-hosted GitLab instance or have specific GitLab block attributes that differ from these assumptions, let me know and I can adjust the conversion logic accordingly.Yu Shao Pang
07/02/2025, 12:20 PMMarvin
07/02/2025, 12:20 PMMarvin
07/02/2025, 12:20 PMDeployment.load_from_yaml
method has been removed in Prefect 3.x. Here are your options for handling YAML-based deployments:
1. Preferred Method (CLI):
bash
prefect deploy --prefect-file path/to/your/prefect.yaml
2. Programmatic Alternative:
python
from prefect.deployments import deploy
# Deploy using a YAML file
result = deploy(prefect_file="path/to/your/prefect.yaml")
The YAML file format has also changed. Here's the new format for `prefect.yaml`:
yaml
# prefect.yaml
deployments:
- name: my-deployment
entrypoint: "path/to/flow.py:my_flow"
work_pool:
name: my-work-pool
schedule:
interval: 3600
parameters:
param1: value1
tags:
- prod
If you're migrating existing code that uses Deployment.load_from_yaml
, you'll need to:
1. Update your YAML files to the new format
2. Replace the loading code with either:
- The CLI command prefect deploy
- The programmatic deploy()
function
For example:
python
# Old way (2.x) - No longer works
from prefect.deployments import Deployment
deployment = Deployment.load_from_yaml("deployment.yaml")
# New way (3.x)
from prefect.deployments import deploy
# Option 1: Deploy directly
result = deploy(prefect_file="prefect.yaml")
# Option 2: More granular control
from prefect import flow
@flow
def my_flow():
pass
if __name__ == "__main__":
my_flow.from_source(
source=".", # or your repository URL
entrypoint="path/to/flow.py:my_flow"
).deploy(
name="my-deployment",
work_pool_name="my-work-pool"
)
Would you like me to provide more details about the new YAML format or how to migrate specific YAML configurations from 2.x to 3.x?Marvin
07/02/2025, 12:31 PMYu Shao Pang
07/02/2025, 12:32 PMMarvin
07/02/2025, 12:32 PMMarvin
07/02/2025, 12:33 PMProcess
from prefect.infrastructure
has been removed. Instead, you should use work pools with the "process" type. Here's how to migrate:
python
# Old way (2.x) - No longer works
from prefect.infrastructure import Process
process_infra = Process(...)
# New way (3.x)
# First, create a process work pool (can be done via CLI or UI)
prefect work-pool create my-process-pool --type process
# Then use it in your deployment
my_flow.from_source(
source=".",
entrypoint="path/to/flow.py:my_flow"
).deploy(
name="my-deployment",
work_pool_name="my-process-pool"
)
If you're setting this up for the first time:
1. Create a process work pool:
bash
prefect work-pool create my-process-pool --type process
2. Start a worker:
bash
prefect worker start --pool my-process-pool
3. Deploy your flow:
python
from prefect import flow
@flow
def my_flow():
pass
if __name__ == "__main__":
my_flow.from_source(
source=".",
entrypoint="path/to/flow.py:my_flow"
).deploy(
name="my-deployment",
work_pool_name="my-process-pool"
)
Or using `prefect.yaml`:
yaml
deployments:
- name: my-deployment
entrypoint: "path/to/flow.py:my_flow"
work_pool:
name: my-process-pool
The key differences are:
1. No direct infrastructure objects - everything is managed through work pools
2. Process execution is handled by workers instead of infrastructure blocks
3. Configuration is done through work pool settings rather than infrastructure objects
Would you like me to provide more details about configuring process work pools or migrating specific process infrastructure settings?Yu Shao Pang
07/02/2025, 12:34 PMMarvin
07/02/2025, 12:34 PMMarvin
07/02/2025, 12:35 PMbash
# From your terminal (not within a worker)
prefect work-pool create my-process-pool --type process
This can be done:
- From any terminal with Prefect CLI access
- Through the Prefect UI
- Using the REST API
2. Then, start workers (do this on each machine that will run flows):
bash
# In a separate terminal that will keep running
prefect worker start --pool my-process-pool
3. Deploy flows (do this when setting up deployments):
python
my_flow.from_source(
source=".",
entrypoint="flow.py:my_flow"
).deploy(
name="my-deployment",
work_pool_name="my-process-pool"
)
Key points:
- Work pool creation is a one-time setup step
- Workers connect to existing work pools
- You can have multiple workers for the same work pool
- Workers need to keep running to execute flows
- Work pool creation is independent of worker processes
Think of it like this:
1. Work pool = the configuration/definition of where work can run
2. Worker = the process that actually executes the work
3. You create the work pool first, then start workers that connect to it
Would you like me to provide more details about work pool configuration options or worker management?Yu Shao Pang
07/02/2025, 12:37 PMMarvin
07/02/2025, 12:37 PMMarvin
07/02/2025, 12:39 PMpython
from prefect import flow
@flow
def my_flow():
pass
if __name__ == "__main__":
if is_ci_build:
# For CI environment
my_flow.from_source(
source=".",
entrypoint="flow.py:my_flow"
).deploy(
name="my-deployment",
work_pool_name="my-process-pool",
job_variables={
"command": ["${prefect.system.python}", "-m", "prefect.engine"],
"env": {
"PREFECT_API_URL": "${prefect.api.url}",
"PREFECT_API_KEY": "${prefect.api.key}"
}
}
)
else:
# For local environment
my_flow.from_source(
source=".",
entrypoint="flow.py:my_flow"
).deploy(
name="my-deployment",
work_pool_name="my-process-pool",
job_variables={
"working_dir": os.getcwd(),
"command": ["python", "-m", "prefect.engine"]
}
)
Alternatively, you can configure this in your `prefect.yaml`:
yaml
deployments:
- name: my-deployment
entrypoint: flow.py:my_flow
work_pool:
name: my-process-pool
job_variables:
command:
- "${prefect.system.python}" # or "python" for local
- "-m"
- "prefect.engine"
working_dir: ${working_dir} # will use current directory
env:
PREFECT_API_URL: "${prefect.api.url}"
PREFECT_API_KEY: "${prefect.api.key}"
Key differences from your 2.x code:
1. No more Process()._base_flow_run_command()
2. Job variables replace the infrastructure configuration
3. Template variables like ${prefect.system.python}
replace manual path handling
You can also create different work pools for CI and local:
bash
# Create CI work pool
prefect work-pool create ci-pool --type process
# Create local work pool
prefect work-pool create local-pool --type process
Then use them conditionally:
python
work_pool_name = "ci-pool" if is_ci_build else "local-pool"
my_flow.from_source(
source=".",
entrypoint="flow.py:my_flow"
).deploy(
name="my-deployment",
work_pool_name=work_pool_name
)
Available template variables include:
- `${prefect.system.python}`: Current Python executable
- `${working_dir}`: Current working directory
- `${prefect.api.url}`: Prefect API URL
- `${prefect.api.key}`: Prefect API key
Would you like me to provide more details about configuring specific job variables or handling other aspects of the process work pool?Marvin
07/02/2025, 12:43 PMpython
from prefect.deployments.base import deploy
# Load and deploy from YAML
result = deploy(prefect_file="path/to/your/prefect.yaml")
Or if you need to work with deployments programmatically:
python
from prefect.client import get_client
async def get_deployment(name: str):
client = get_client()
deployment = await client.read_deployment_by_name(name)
return deployment
The key differences are:
1. Deployment.load_from_yaml()
no longer exists
2. Use deploy()
function for creating/updating deployments from YAML
3. Use the client API for reading existing deployments
If you need to work with existing deployments, the client API is your best option:
python
from prefect.client import get_client
async def work_with_deployment():
client = get_client()
# Get deployment
deployment = await client.read_deployment_by_name("flow-name/deployment-name")
# Update deployment
await client.update_deployment(
deployment.id,
# update parameters here
)
Would you like me to provide more specific examples based on how you're using load_from_yaml
in your code?Yu Shao Pang
07/02/2025, 12:50 PMMarvin
07/02/2025, 12:50 PMMarvin
07/02/2025, 12:50 PMBaseTaskRunner
has been removed. Instead, you should use TaskRunner
as the base class. Here's how to update your code:
python
# Old way (2.x)
from prefect.task_runners import BaseTaskRunner
# New way (3.x)
from prefect.task_runners import TaskRunner
The available task runners in 3.x are:
1. TaskRunner
- Base class for implementing task runners
2. ThreadPoolTaskRunner
- Executes tasks in a thread pool
3. PrefectTaskRunner
- Default task runner
Example usage:
python
from prefect.task_runners import TaskRunner
class MyCustomTaskRunner(TaskRunner):
def __init__(self, **kwargs):
super().__init__(**kwargs)
async def submit(self, task, task_run, parameters):
# Implement your custom submission logic
pass
If you were using specific functionality from BaseTaskRunner
, let me know what features you need, and I can help you implement them with the new TaskRunner
class.Yu Shao Pang
07/02/2025, 12:52 PMMarvin
07/02/2025, 12:52 PMMarvin
07/02/2025, 12:53 PMTaskConcurrencyType
has been removed in favor of a new concurrency management system. Here's how to handle task concurrency in 3.x:
1. Using the concurrency decorator/context manager:
python
from prefect.concurrency import concurrency
# As a decorator
@task
@concurrency(key="my-key", limits=2)
def my_task():
pass
# As a context manager
async with concurrency(key="my-key", limits=2):
# your code here
pass
2. For rate limiting:
python
from prefect.concurrency import rate_limit
@task
@rate_limit(key="my-key", rate=100, per=60) # 100 executions per 60 seconds
def my_task():
pass
3. For global concurrency limits (via CLI):
bash
# Create a global concurrency limit
prefect gcl create my-limit --slots 5
# Update a limit
prefect gcl update my-limit --slots 10
# Delete a limit
prefect gcl delete my-limit
4. For programmatic concurrency management:
python
from prefect.client import get_client
async def manage_concurrency():
client = get_client()
# Create a concurrency limit
await client.create_concurrency_limit(
tag="my-tag",
concurrency_limit=5
)
# Update a limit
await client.update_concurrency_limit(
tag="my-tag",
concurrency_limit=10
)
The key differences from TaskConcurrencyType
are:
1. More flexible concurrency control through decorators and context managers
2. Separate handling for rate limiting and concurrency limits
3. Global concurrency limits managed through CLI or client API
4. Built-in support for both sync and async operations
Would you like me to provide more specific examples based on how you were using TaskConcurrencyType
in your code?Yu Shao Pang
07/02/2025, 12:53 PMMarvin
07/02/2025, 12:53 PMMarvin
07/02/2025, 12:54 PMTaskConcurrencyType
in your 2.x code? That way, I can show you the exact equivalent in 3.x.
In general, here's a typical migration pattern:
python
# Prefect 2.x way
from prefect.task_runners import BaseTaskRunner
from prefect.utilities.enum import TaskConcurrencyType
class MyTaskRunner(BaseTaskRunner):
def __init__(self, concurrency_type: TaskConcurrencyType = TaskConcurrencyType.PARALLEL):
self.concurrency_type = concurrency_type
super().__init__()
# Prefect 3.x way
from prefect.task_runners import TaskRunner
from prefect.concurrency import concurrency
class MyTaskRunner(TaskRunner):
def __init__(self, max_concurrent_tasks: int = None):
self.max_concurrent_tasks = max_concurrent_tasks
super().__init__()
async def submit(self, task, task_run, parameters):
async with concurrency(key="my-runner", limits=self.max_concurrent_tasks):
return await super().submit(task, task_run, parameters)
If you share your specific use case, I can provide a more targeted migration example.Yu Shao Pang
07/02/2025, 12:54 PMMarvin
07/02/2025, 12:54 PMMarvin
07/02/2025, 12:54 PMTaskConcurrencyType.PARALLEL
was just indicating parallel execution capability. In Prefect 3.x, parallel execution is handled differently.
Here's how to migrate:
python
# Old way (2.x)
from prefect.utilities.enum import TaskConcurrencyType
def some_function():
return TaskConcurrencyType.PARALLEL
# New way (3.x)
# If you just need parallel execution, you can use ThreadPoolTaskRunner
from prefect.task_runners import ThreadPoolTaskRunner
def some_function():
return ThreadPoolTaskRunner()
Or if this was part of a task runner class:
python
# Old way (2.x)
from prefect.task_runners import BaseTaskRunner
from prefect.utilities.enum import TaskConcurrencyType
class MyTaskRunner(BaseTaskRunner):
def get_concurrency_type(self):
return TaskConcurrencyType.PARALLEL
# New way (3.x)
from prefect.task_runners import TaskRunner
class MyTaskRunner(TaskRunner):
# Parallel execution is now handled by the task runner implementation
# No need to explicitly specify concurrency type
pass
The key point is that in 3.x:
- Task concurrency types are no longer explicitly enumerated
- Parallel execution is handled by the task runner implementation itself
- ThreadPoolTaskRunner
is the default choice for parallel task execution
Would you like me to provide more details about how parallel execution works in the new version?Yu Shao Pang
07/02/2025, 2:38 PMMarvin
07/02/2025, 2:38 PMMarvin
07/02/2025, 2:39 PMprefect.events.instrument
functionality has been replaced with a new events system. Here's how to migrate:
python
# Old way (2.x)
from prefect.events import instrument
@instrument
def my_function():
pass
# New way (3.x)
from prefect.events import emit_event
def my_function():
# Emit specific events
emit_event(
event="my-event-name",
resource={
"prefix": "prefect.resource",
"name": "my-resource"
},
payload={
"key": "value"
}
)
For more complex event handling:
python
from prefect.events import emit_event
from prefect.events.schemas import Resource, Event
async def handle_with_events():
# Create and emit an event
await emit_event(
event=Event(
name="my.custom.event",
resource=Resource(
prefix="my.resource",
name="resource-name"
),
payload={"data": "value"}
)
)
# If you need to subscribe to events
from prefect.events import get_events_subscriber
async def listen_to_events():
subscriber = await get_events_subscriber()
async for event in subscriber.events():
# Handle event
print(f"Received event: {event.name}")
Key differences in 3.x:
1. Direct event emission using emit_event
instead of decorator-based instrumentation
2. More structured event data with Event
and Resource
classes
3. Support for async event handling
4. Built-in event subscription capabilities
Would you like me to provide more specific examples based on how you were using instrument
in your code?Yu Shao Pang
07/02/2025, 2:40 PMMarvin
07/02/2025, 2:40 PMMarvin
07/02/2025, 2:42 PMinstrument_instance_method_call
for:
1. Using Events (Recommended Approach):
python
from prefect.events import emit_event
from functools import wraps
def instrument_method(method_name):
def decorator(func):
@wraps(func)
async def wrapper(self, *args, **kwargs):
# Emit event before method call
await emit_event(
event=f"{method_name}.start",
resource={
"prefix": self.__class__.__name__,
"name": method_name
},
payload={"args": args, "kwargs": kwargs}
)
try:
result = await func(self, *args, **kwargs)
# Emit success event
await emit_event(
event=f"{method_name}.complete",
resource={
"prefix": self.__class__.__name__,
"name": method_name
}
)
return result
except Exception as e:
# Emit failure event
await emit_event(
event=f"{method_name}.failed",
resource={
"prefix": self.__class__.__name__,
"name": method_name
},
payload={"error": str(e)}
)
raise
return wrapper
return decorator
# Usage:
class MyClass:
@instrument_method("my_method")
async def my_method(self, arg1, arg2):
# method implementation
pass
2. Using RunTelemetry (For Flow/Task Related Methods):
python
from prefect.telemetry.run_telemetry import RunTelemetry
from functools import wraps
def instrument_run_method(method_name):
def decorator(func):
@wraps(func)
async def wrapper(self, *args, **kwargs):
telemetry = RunTelemetry()
with telemetry.start_span(name=method_name) as span:
try:
result = await func(self, *args, **kwargs)
span.set_attribute("status", "success")
return result
except Exception as e:
span.set_attribute("status", "failed")
span.record_exception(e)
raise
return wrapper
return decorator
# Usage:
class MyClass:
@instrument_run_method("my_method")
async def my_method(self, arg1, arg2):
# method implementation
pass
3. Using Logging (Simplest Approach):
python
from prefect.logging import get_logger
from functools import wraps
def instrument_method_with_logging(method_name):
logger = get_logger()
def decorator(func):
@wraps(func)
async def wrapper(self, *args, **kwargs):
<http://logger.info|logger.info>(f"Starting {method_name}")
try:
result = await func(self, *args, **kwargs)
<http://logger.info|logger.info>(f"Completed {method_name}")
return result
except Exception as e:
logger.error(f"Failed {method_name}: {str(e)}")
raise
return wrapper
return decorator
# Usage:
class MyClass:
@instrument_method_with_logging("my_method")
async def my_method(self, arg1, arg2):
# method implementation
pass
The key differences from `instrument_instance_method_call`:
1. More explicit event emission
2. Better integration with Prefect's event system
3. More flexible instrumentation options
4. Built-in support for async methods
Would you like me to provide a more specific example based on how you were using instrument_instance_method_call
in your code?Yu Shao Pang
07/02/2025, 2:44 PMMarvin
07/02/2025, 2:44 PMMarvin
07/02/2025, 2:44 PMinstrument_instance_method_call
in your 2.x code? That way, I can show you the exact migration path for your specific use case.
A typical before/after would look like this:
python
# BEFORE (Prefect 2.x)
from prefect.events import instrument_instance_method_call
class MyClass:
@instrument_instance_method_call
def my_method(self, arg1, arg2):
# method implementation
pass
# AFTER (Prefect 3.x)
from prefect.events import emit_event
from functools import wraps
def instrument_method(func):
@wraps(func)
async def wrapper(self, *args, **kwargs):
method_name = func.__name__
await emit_event(
event=f"{self.__class__.__name__}.{method_name}.start",
resource={
"prefix": self.__class__.__name__,
"name": method_name
}
)
try:
result = await func(self, *args, **kwargs)
await emit_event(
event=f"{self.__class__.__name__}.{method_name}.complete",
resource={
"prefix": self.__class__.__name__,
"name": method_name
}
)
return result
except Exception as e:
await emit_event(
event=f"{self.__class__.__name__}.{method_name}.failed",
resource={
"prefix": self.__class__.__name__,
"name": method_name
},
payload={"error": str(e)}
)
raise
return wrapper
class MyClass:
@instrument_method
async def my_method(self, arg1, arg2):
# method implementation
pass
If you share your specific usage, I can provide a more targeted migration example that matches your exact needs.Yu Shao Pang
07/02/2025, 3:08 PMMarvin
07/02/2025, 3:08 PMMarvin
07/02/2025, 3:09 PMTaskConcurrencyType.PARALLEL
in Prefect 3.x. The concept of explicitly defining concurrency types through enums has been removed in favor of more direct concurrency control through task runners and concurrency limits.
If you need to indicate parallel execution capability, you have these options:
1. Use a boolean (simplest):
python
PARALLEL = True
def your_function():
return PARALLEL
2. Use Python's enum if you want to maintain enum-like behavior:
python
from enum import Enum
class ConcurrencyType(Enum):
PARALLEL = "parallel"
SEQUENTIAL = "sequential"
def your_function():
return ConcurrencyType.PARALLEL
3. Use the task runner directly (most Prefect 3.x-like):
python
from prefect.task_runners import ThreadPoolTaskRunner
def your_function():
return ThreadPoolTaskRunner()
The key point is that in Prefect 3.x:
- Concurrency is handled through task runners and concurrency limits
- There's no need for explicit concurrency type enums
- Parallel execution is the default when using appropriate task runners
Would you like me to show you how to implement any of these approaches in your specific code?Yu Shao Pang
07/02/2025, 3:11 PMdynamic_task_runner_env
.
Also, this automatically takes care of any dask/memory_limits on kubernetes
This should be used in your flow definition as follows:
@flow(..., task_runner=DynamicTaskRunner)
def my_flow():
...
"""
assert PREF_TASK_RUNNER_CONFIG in os.environ, (
f"No {PREF_TASK_RUNNER_CONFIG} found, please see the docstring of: "
"`sp_prefect_extensions.task_runners.dynamic_task_runner_env` for usage"
)
task_runner_config = DynamicTaskRunnerConfig(
**json.loads(os.environ[PREF_TASK_RUNNER_CONFIG])
)
self.logger.info("TASK RUNNER: %s", task_runner_config)
if task_runner_config.type == TaskRunner.DaskTaskRunner:
# The below package needs to be provided by the client
from prefect_dask import DaskTaskRunner # type: ignore
params = task_runner_config.params
if kubernetes.on_kubernetes():
if "memory_limit" in params.cluster_kwargs:
params.cluster_kwargs[
"memory_limit"
] = kubernetes.k8s_to_dask_memory(
params.cluster_kwargs["memory_limit"]
)
elif params.cluster_class is None:
# Only automatically setting the limit for the default cluster
# class, as e.g. cluster_class="dask_jobqueue.SLURMCluster does
# not have this param...
# Set it slightly lower than the K8s Limit (note that this is in
# bytes, so no conversion from dask to k8s is necessary
k8s_limit = kubernetes.get_memory_limit()
if k8s_limit is not None:
params.cluster_kwargs["memory_limit"] = k8s_limit - 1000
return DaskTaskRunner(**asdict(params))
elif task_runner_config.type == TaskRunner.RayTaskRunner:
# The below package needs to be provided by the client
from prefect_ray import RayTaskRunner # type: ignore
return RayTaskRunner(**asdict(task_runner_config.params))
elif task_runner_config.type == TaskRunner.SequentialTaskRunner:
# The below package needs to be provided by the client
from prefect.task_runners import SequentialTaskRunner
return SequentialTaskRunner()
elif task_runner_config.type == TaskRunner.ConcurrentTaskRunner:
# The below package needs to be provided by the client
from prefect.task_runners import ConcurrentTaskRunner
return ConcurrentTaskRunner()
else:
raise NotImplementedError(task_runner_config.type)
@property
def concurrency_type(self):
# Should not really have to provide an implementation for this, however
# there is a bug in a prefect log statement here:
# https://github.com/PrefectHQ/prefect/blob/main/src/prefect/engine.py#L503
# where it uses flow.task_runner instead of task_runner
# Returning PARALLEL even though It could be CONCURRENT for dask if
# cluster_kwargs.processes is not set. However, this is not used anywhere
# of importance
return TaskConcurrencyType.PARALLEL
async def submit(self, key, call):
"""Should never be called as the flow run actually calls duplicate() first"""
raise NotImplementedError()
async def wait(self, key, timeout):
"""Should never be called as the flow run actually calls duplicate() first"""
raise NotImplementedError()Marvin
07/02/2025, 3:11 PMMarvin
07/02/2025, 3:11 PMpython
from prefect.task_runners import TaskRunner
from dataclasses import asdict
from typing import Literal, Optional
import os
import json
class DynamicTaskRunner(TaskRunner):
"""Proxy for Ray/Dask Task runners. See module docstring for more info."""
def __init__(self):
super().__init__()
# In 3.x, we don't need concurrency_type anymore
def duplicate(self) -> TaskRunner:
"""When a new flow run is started it calls this method to get the task runner.
Constructs the concrete task runner via config in environment variable.
See `dynamic_task_runner_env`.
Also, this automatically takes care of any dask/memory_limits on kubernetes
This should be used in your flow definition as follows:
@flow(task_runner=DynamicTaskRunner())
def my_flow():
...
"""
assert PREF_TASK_RUNNER_CONFIG in os.environ, (
f"No {PREF_TASK_RUNNER_CONFIG} found, please see the docstring of: "
"`sp_prefect_extensions.task_runners.dynamic_task_runner_env` for usage"
)
task_runner_config = DynamicTaskRunnerConfig(
**json.loads(os.environ[PREF_TASK_RUNNER_CONFIG])
)
<http://self.logger.info|self.logger.info>("TASK RUNNER: %s", task_runner_config)
if task_runner_config.type == "DaskTaskRunner":
# The below package needs to be provided by the client
from prefect_dask import DaskTaskRunner # type: ignore
params = task_runner_config.params
if kubernetes.on_kubernetes():
if "memory_limit" in params.cluster_kwargs:
params.cluster_kwargs[
"memory_limit"
] = kubernetes.k8s_to_dask_memory(
params.cluster_kwargs["memory_limit"]
)
elif params.cluster_class is None:
# Only automatically setting the limit for the default cluster
# class, as e.g. cluster_class="dask_jobqueue.SLURMCluster does
# not have this param...
# Set it slightly lower than the K8s Limit (note that this is in
# bytes, so no conversion from dask to k8s is necessary
k8s_limit = kubernetes.get_memory_limit()
if k8s_limit is not None:
params.cluster_kwargs["memory_limit"] = k8s_limit - 1000
return DaskTaskRunner(**asdict(params))
elif task_runner_config.type == "RayTaskRunner":
# The below package needs to be provided by the client
from prefect_ray import RayTaskRunner # type: ignore
return RayTaskRunner(**asdict(task_runner_config.params))
elif task_runner_config.type == "SequentialTaskRunner":
from prefect.task_runners import SequentialTaskRunner
return SequentialTaskRunner()
elif task_runner_config.type == "ThreadPoolTaskRunner": # Changed from ConcurrentTaskRunner
from prefect.task_runners import ThreadPoolTaskRunner # Changed from ConcurrentTaskRunner
return ThreadPoolTaskRunner()
else:
raise NotImplementedError(task_runner_config.type)
async def submit(self, task, task_run, parameters):
"""Should never be called as the flow run actually calls duplicate() first"""
raise NotImplementedError()
# Helper class for type safety
class DynamicTaskRunnerConfig:
def __init__(
self,
type: Literal["DaskTaskRunner", "RayTaskRunner", "SequentialTaskRunner", "ThreadPoolTaskRunner"],
params: Optional[dict] = None
):
self.type = type
self.params = params or {}
Key changes made:
1. Removed concurrency_type
property as it's no longer needed inYu Shao Pang
07/02/2025, 3:24 PMMarvin
07/02/2025, 3:24 PMMarvin
07/02/2025, 3:24 PMTaskRunner
is a class, not an enum. Let me help you fix that.
```python
from prefect.task_runners import TaskRunner
from dataclasses import asdict
from typing import Literal, Optional
from enum import Enum
import os
import json
# Create a separate enum for task runner types
class TaskRunnerType(str, Enum):
DASK = "DaskTaskRunner"
RAY = "RayTaskRunner"
SEQUENTIAL = "SequentialTaskRunner"
THREAD_POOL = "ThreadPoolTaskRunner" # Changed from ConcurrentTaskRunner in 2.x
class DynamicTaskRunner(TaskRunner):
"""Proxy for Ray/Dask Task runners. See module docstring for more info."""
def __init__(self):
super().__init__()
def duplicate(self) -> TaskRunner:
"""When a new flow run is started it calls this method to get the task runner.
Constructs the concrete task runner via config in environment variable.
See dynamic_task_runner_env
.
Also, this automatically takes care of any dask/memory_limits on kubernetes
This should be used in your flow definition as follows:
@flow(task_runner=DynamicTaskRunner())
def my_flow():
...
"""
assert PREF_TASK_RUNNER_CONFIG in os.environ, (
f"No {PREF_TASK_RUNNER_CONFIG} found, please see the docstring of: "
"`sp_prefect_extensions.task_runners.dynamic_task_runner_env` for usage"
)
task_runner_config = DynamicTaskRunnerConfig(
**json.loads(os.environ[PREF_TASK_RUNNER_CONFIG])
)
self.logger.info("TASK RUNNER: %s", task_runner_config)
if task_runner_config.type == TaskRunnerType.DASK:
# The below package needs to be provided by the client
from prefect_dask import DaskTaskRunner # type: ignore
params = task_runner_config.params
if kubernetes.on_kubernetes():
if "memory_limit" in params.cluster_kwargs:
params.cluster_kwargs[
"memory_limit"
] = kubernetes.k8s_to_dask_memory(
params.cluster_kwargs["memory_limit"]
)
elif params.cluster_class is None:
# Only automatically setting the limit for the default cluster
# class, as e.g. cluster_class="dask_jobqueue.SLURMCluster does
# not have this param...
# Set it slightly lower than the K8s Limit (note that this is in
# bytes, so no conversion from dask to k8s is necessary
k8s_limit = kubernetes.get_memory_limit()
if k8s_limit is not None:
params.cluster_kwargs["memory_limit"] = k8s_limit - 1000
return DaskTaskRunner(**asdict(params))
elif task_runner_config.type == TaskRunnerType.RAY:
# The below package needs to be provided by the client
from prefect_ray import RayTaskRunner # type: ignore
return RayTaskRunner(**asdict(task_runner_config.params))
elif task_runner_config.type == TaskRunnerType.SEQUENTIAL:
from prefect.task_runners import SequentialTaskRunner
return SequentialTaskRunner()
elif task_runner_config.type == TaskRunnerType.THREAD_POOL:
from prefect.task_runners import ThreadPoolTaskRunner
return ThreadPoolTaskRunner()
else:
raise NotImplementedError(task_runner_config.type)
async def submit(self, task, task_run, parameters):
"""Should never be called as the flow run actually calls duplicate() first"""
raise NotImplementedError()
# Helper class for type safety
class DynamicTaskRunnerConfig:
def __init__(
self,
type: TaskRunnerYu Shao Pang
07/02/2025, 3:26 PMMarvin
07/02/2025, 3:26 PMMarvin
07/02/2025, 3:27 PMTaskRunner
from the wrong location or getting a name collision. Let's verify the correct import and usage:
python
# Make sure you're importing from the correct location
from prefect.task_runners import TaskRunner
# And let's verify it's actually a class
print(type(TaskRunner)) # Should print "<class 'type'>"
Could you share:
1. Your exact import statement for TaskRunner
2. The full error traceback
This will help us identify if there's a naming collision or incorrect import that's causing TaskRunner to be seen as an enumeration when it should be a class.
The correct structure should be:
python
from prefect.task_runners import TaskRunner
class DynamicTaskRunner(TaskRunner):
def __init__(self):
super().__init__()
async def submit(self, task, task_run, parameters):
raise NotImplementedError()
If you're still getting the enumeration error, there might be:
1. A name collision with another TaskRunner import
2. A cached .pyc
file causing issues
3. An incorrect import path
Could you share those details so we can identify the root cause?Yu Shao Pang
07/02/2025, 3:42 PMMarvin
07/02/2025, 3:42 PMMarvin
07/02/2025, 3:43 PMpython
from prefect.types import DateTime
# In your model/schema
datetime_field: DateTime
2. Using Pydantic's datetime field with timezone (if you need explicit timezone handling):
python
from datetime import datetime
from pydantic import Field
from zoneinfo import ZoneInfo
# In your model/schema
datetime_field: datetime = Field(default_factory=lambda: datetime.now(ZoneInfo("UTC")))
3. If you need more specific timezone handling:
python
from datetime import datetime
from pydantic import BaseModel, Field
from zoneinfo import ZoneInfo
class DateTimeTZ(datetime):
@classmethod
def __get_validators__(cls):
yield cls.validate
@classmethod
def validate(cls, v):
if isinstance(v, datetime):
if v.tzinfo is None:
v = v.replace(tzinfo=ZoneInfo("UTC"))
return v
raise ValueError('datetime required')
# Usage in your model
class YourModel(BaseModel):
datetime_field: DateTimeTZ
4. For the simplest approach with UTC:
python
from datetime import datetime
from pydantic import BaseModel
class YourModel(BaseModel):
datetime_field: datetime
class Config:
json_encoders = {
datetime: lambda dt: dt.replace(tzinfo=timezone.utc).isoformat()
}
The key differences in 3.x are:
- More reliance on Pydantic's native datetime handling
- Simplified timezone management through Python's zoneinfo
- Use of Prefect's built-in DateTime
type for standard cases
Would you like me to provide more specific examples based on how you're using DateTimeTZ
in your code?Yu Shao Pang
07/02/2025, 3:52 PMMarvin
07/02/2025, 3:52 PMMarvin
07/02/2025, 3:53 PMpython
from prefect.client import get_client
client = get_client() # Returns PrefectClient instance
2. Direct import from orchestration module:
python
from prefect.client.orchestration import PrefectClient
client = PrefectClient()
3. If you need the synchronous version:
python
from prefect.client.orchestration import SyncPrefectClient
client = SyncPrefectClient()
The key differences in 3.x are:
- PrefectClient is now in the orchestration module
- There's a convenient get_client()
function that handles configuration
- Separate sync and async client classes are available
The recommended approach is to use get_client()
as it handles configuration and environment settings automatically:
python
from prefect.client import get_client
async def my_function():
client = get_client()
# Use the client
flows = await client.read_flows()
Would you like me to provide more examples of how to use the client in specific scenarios?Yu Shao Pang
07/02/2025, 4:07 PMMarvin
07/02/2025, 4:07 PMMarvin
07/02/2025, 4:10 PMpython
from dataclasses import dataclass
from prefect.client.schemas.objects import Deployment
from typing import Optional
@dataclass(frozen=True)
class DeploymentRecord:
"""A subset of Deployment values to be recorded for later validation by workers."""
id: str
version: str
flow_id: str
command: str
entrypoint: str
app: str
@staticmethod
def from_deployment(deployment: Deployment) -> "DeploymentRecord":
"""Create a DeploymentRecord from a Prefect 3.x deployment."""
app_name = [
el.split("app:")[-1] for el in deployment.tags if el.startswith("app:")
]
assert len(app_name) == 1, "Tag app:<appname> missing"
app_name = app_name[0]
# In 3.x, command is in job_variables instead of infra_overrides
command = deployment.job_variables.get("command", "")
if isinstance(command, list):
command = " ".join(command)
return DeploymentRecord(
id=str(deployment.id),
version=deployment.version,
flow_id=str(deployment.flow_id),
command=command,
entrypoint=deployment.entrypoint,
app=app_name,
)
@staticmethod
async def from_deployment_name(deployment_name: str) -> "DeploymentRecord":
"""Alternative constructor using deployment name."""
from prefect.client import get_client
client = get_client()
deployment = await client.read_deployment_by_name(deployment_name)
return DeploymentRecord.from_deployment(deployment)
Key changes:
1. Removed config=dict(extra="forbid")
as it's not needed with standard dataclass
2. Changed infra_overrides["command"]
to job_variables.get("command", "")
3. Added handling for command as list (common in 3.x)
4. Added alternative constructor using deployment name
Usage example:
python
# Using deployment object
deployment_record = DeploymentRecord.from_deployment(deployment)
# Using deployment name
deployment_record = await DeploymentRecord.from_deployment_name("flow-name/deployment-name")
Note that in Prefect 3.x:
- Infrastructure configuration is handled through work pools and job variables
- Commands are often specified as lists in job variables
- The deployment object structure is more standardized
Would you like me to explain any specific part of the migration in more detail?Yu Shao Pang
07/02/2025, 4:44 PMMarvin
07/02/2025, 4:44 PMMarvin
07/02/2025, 4:45 PMpython
from prefect.client import get_client
# Old way (2.x)
filter_criteria = {
"deployments": {
"operator": "and_",
"id": None,
"name": None,
"paused": None,
"is_schedule_active": None, # This field no longer exists
"tags": {
"operator": "and_",
"all_": ["app:cron-pipelines"],
"is_null_": None
},
"work_queue_name": None
}
}
# New way (3.x)
async def get_deployments():
client = get_client()
return await client.read_deployments(
tags=["app:cron-pipelines"] # Simplified filter syntax
)
# If you need more complex filtering:
from prefect.client.schemas.filters import DeploymentFilter, DeploymentFilterTags
async def get_deployments_complex():
client = get_client()
deployment_filter = DeploymentFilter(
tags=DeploymentFilterTags(
all_=["app:cron-pipelines"]
)
)
return await client.read_deployments(deployment_filter=deployment_filter)
For your CI pipeline, you might want to update it like this:
python
from prefect.client import get_client
from prefect.client.schemas.filters import DeploymentFilter
async def get_pipeline_deployments():
client = get_client()
# Simple version
deployments = await client.read_deployments(
tags=["app:cron-pipelines"]
)
# Or if you need to check schedule status:
active_deployments = [
d for d in deployments
if d.schedule is not None and not d.paused
]
return active_deployments
# If you're using it in a script:
if __name__ == "__main__":
import asyncio
deployments = asyncio.run(get_pipeline_deployments())
Key changes:
1. Removed is_schedule_active
from filter criteria
2. Simplified the filter structure
3. Use the client's built-in filtering capabilities
4. Handle schedule status checking after fetching deployments
If you need to check for active schedules specifically:
python
async def get_active_scheduled_deployments():
client = get_client()
deployments = await client.read_deployments(
tags=["app:cron-pipelines"]
)
return [
d for d in deployments
if (d.schedule is not None and # Has a schedule
not d.paused and # Not paused
d.schedule.active) # Schedule is active
]
Would you like me to provide more specific examples based on your CI pipeline's needs?Marvin
07/02/2025, 4:46 PMMarvin
07/02/2025, 4:46 PMpython
from prefect.client import get_client
from prefect.client.schemas.filters import DeploymentFilter, DeploymentFilterTags
# Old way (2.x) that's causing the error:
filter_criteria = {
"deployments": {
"operator": "and_",
"id": None,
"name": None,
"paused": None,
"is_schedule_active": None, # This field is causing the 422 error
"tags": {
"operator": "and_",
"all_": ["app:cron-pipelines"],
"is_null_": None
},
"work_queue_name": None
}
}
# New way (3.x)
async def get_deployments():
client = get_client()
# Option 1: Simple filtering
deployments = await client.read_deployments(
tags=["app:cron-pipelines"]
)
return deployments
# Option 2: Using DeploymentFilter for more complex criteria
deployment_filter = DeploymentFilter(
tags=DeploymentFilterTags(
all_=["app:cron-pipelines"]
)
)
deployments = await client.read_deployments(
deployment_filter=deployment_filter
)
return deployments
If you're using this in a CI script, here's a complete example:
python
from prefect.client import get_client
import asyncio
async def main():
client = get_client()
# Get deployments
deployments = await client.read_deployments(
tags=["app:cron-pipelines"]
)
# If you need to check for active schedules
active_deployments = [
d for d in deployments
if d.schedule is not None and not d.paused
]
# Process your deployments
for deployment in active_deployments:
# Your deployment logic here
print(f"Processing deployment: {deployment.name}")
if __name__ == "__main__":
asyncio.run(main())
The key changes are:
1. Removed the deprecated is_schedule_active
filter
2. Use the new client API methods directly
3. Filter for schedules after fetching the deployments
Would you like me to provide more specific examples based on what your CI pipeline is trying to accomplish?Yu Shao Pang
07/02/2025, 4:52 PMMarvin
07/02/2025, 4:52 PMMarvin
07/02/2025, 4:53 PMpython
from typing import List
from prefect.client import get_client
from prefect.client.schemas.filters import DeploymentFilter, DeploymentFilterTags
from prefect.client.schemas.responses import DeploymentResponse
async def get_current_deployments(
app_name: str,
api_url: str
) -> List[DeploymentResponse]: # or your custom DeploymentResponseWithUpdatedTime
"""Gets the current deployments for the specified `app_name`"""
<http://logging.info|logging.info>(
"Reading deployments for app '%s' from '%s'",
app_name,
api_url if api_url is not None else "DEFAULT"
)
client = get_client() # Note: context manager not needed in 3.x
# Create the filter correctly
deployment_filter = DeploymentFilter(
tags=DeploymentFilterTags(
all_=[f"app:{app_name}"]
)
)
# Use the client's read_deployments method directly
return await client.read_deployments(
deployment_filter=deployment_filter
)
Key changes:
1. Removed the async context manager (async with
) as it's not needed in 3.x
2. Fixed the filter syntax using DeploymentFilterTags
instead of raw dict
3. Using client's read_deployments
method directly
Alternative simpler version if you don't need complex filtering:
python
async def get_current_deployments(
app_name: str,
api_url: str
) -> List[DeploymentResponse]:
"""Gets the current deployments for the specified `app_name`"""
<http://logging.info|logging.info>(
"Reading deployments for app '%s' from '%s'",
app_name,
api_url if api_url is not None else "DEFAULT"
)
client = get_client()
# Simpler tag filtering
return await client.read_deployments(
tags=[f"app:{app_name}"]
)
If you're using a custom DeploymentResponseWithUpdatedTime
type, you might need to adjust the response handling:
python
from dataclasses import dataclass
from datetime import datetime
from prefect.client.schemas.responses import DeploymentResponse
@dataclass
class DeploymentResponseWithUpdatedTime:
# Your custom fields here
deployment: DeploymentResponse
updated_time: datetime
async def get_current_deployments(
app_name: str,
api_url: str
) -> List[DeploymentResponseWithUpdatedTime]:
"""Gets the current deployments for the specified `app_name`"""
<http://logging.info|logging.info>(
"Reading deployments for app '%s' from '%s'",
app_name,
api_url if api_url is not None else "DEFAULT"
)
client = get_client()
deployments = await client.read_deployments(
tags=[f"app:{app_name}"]
)
# Convert to your custom type if needed
return [
DeploymentResponseWithUpdatedTime(
deployment=deployment,
updated_time=deployment.updated # adjust field name as needed
)
for deployment in deployments
]
Would you like me to provide more details about any part of this change?Yu Shao Pang
07/02/2025, 5:14 PMMarvin
07/02/2025, 5:14 PMMarvin
07/02/2025, 5:15 PMpython
from typing import List
from prefect.client import get_client
from prefect.client.schemas.responses import DeploymentResponse
async def get_current_deployments(
app_name: str,
api_url: str
) -> List[DeploymentResponse]:
"""Gets the current deployments for the specified `app_name`"""
<http://logging.info|logging.info>(
"Reading deployments for app '%s' from '%s'",
app_name,
api_url if api_url is not None else "DEFAULT"
)
client = get_client()
# Simplest form - just pass the tag directly
return await client.read_deployments(
tags=[f"app:{app_name}"]
)
If you still need to use the filter object, try this version:
python
from prefect.client import get_client
from prefect.client.schemas.filters import DeploymentFilter
async def get_current_deployments(
app_name: str,
api_url: str
) -> List[DeploymentResponse]:
<http://logging.info|logging.info>(
"Reading deployments for app '%s' from '%s'",
app_name,
api_url if api_url is not None else "DEFAULT"
)
client = get_client()
# Create filter without any extra fields
filter_obj = DeploymentFilter(
tags=[f"app:{app_name}"] # Direct list instead of nested dict
)
return await client.read_deployments(
deployment_filter=filter_obj
)
If you're still getting the 422 error, could you share:
1. The complete error message
2. The version of Prefect you're using (prefect version
)
This will help us identify if there might be version-specific issues or if we need to adjust the filter structure further.
Also, you might want to try printing the filter object before sending it to see exactly what's being sent:
python
async def get_current_deployments(
app_name: str,
api_url: str
) -> List[DeploymentResponse]:
client = get_client()
filter_obj = DeploymentFilter(
tags=[f"app:{app_name}"]
)
# Debug print
print("Filter object:", filter_obj.dict())
return await client.read_deployments(
deployment_filter=filter_obj
)
Would you like me to provide more debugging steps or alternative approaches?