Giacomo Chiarella
07/01/2025, 6:46 PMMarvin
07/01/2025, 6:46 PMMarvin
07/01/2025, 6:47 PM/api/deployments/{id}
endpoint. This is actually expected behavior - in Prefect 3.x, deployment updates should be handled through the Python client rather than direct API calls.
Instead of using a PATCH request, you should:
1. Use the Python client to update deployments:
python
from prefect import get_client
async with get_client() as client:
# Fetch the deployment
deployment = await client.read_deployment("deployment-id")
# Update the deployment properties
deployment.description = "New description"
# Save the changes
await client.update_deployment(deployment)
2. Or use the CLI:
bash
prefect deployment update my-deployment-name
If you're working on automation that requires HTTP API access, you might want to look into using the Python client in your automation code instead, as it provides a more stable and supported interface for deployment management.
The direct REST API endpoints are primarily used by the Prefect UI and the Python client internally. For the most reliable experience, stick to the Python client or CLI for deployment management operations.Giacomo Chiarella
07/01/2025, 6:55 PMMarvin
07/01/2025, 6:55 PMMarvin
07/01/2025, 6:56 PMpython
from prefect import get_client
async with get_client() as client:
# Test basic connectivity
try:
await client.hello()
except Exception as e:
print(f"Connection error: {e}")
2. Check your API URL and authentication:
python
import prefect.settings
print(prefect.settings.PREFECT_API_URL.value())
3. Try the read_deployments call with minimal parameters first:
python
async with get_client() as client:
try:
deployments = await client.read_deployments()
print(f"Found {len(deployments)} deployments")
except Exception as e:
print(f"Error: {e}")
4. If that works, add parameters incrementally to identify which one might be causing the issue. Make sure:
- limit
is a positive integer
- offset
is a non-negative integer
- total_len
is actually defined in your code before using it
- DeploymentSort.NAME_ASC
is properly imported from prefect.client.schemas
Can you try these steps and let me know:
1. Are you able to connect to the server at all?
2. Does the basic read_deployments()
call without parameters work?
3. What version of Prefect are you using? (prefect version
)
This will help narrow down the exact cause of the JSONDecodeError.Giacomo Chiarella
07/01/2025, 7:00 PMasync with get_client() as client:
# Fetch the deployment
deployment = await client.read_deployment("deployment-id")
# Update the deployment properties
deployment.description = "New description"
# Save the changes
await client.update_deployment(deployment)
and I get this
prefect.exceptions.PrefectHTTPStatusError: Client error ‘422 Unprocessable Entity’Giacomo Chiarella
07/01/2025, 7:00 PMasync with get_client() as client:
# Fetch the deployment
deployment = await client.read_deployment("deployment-id")
# Update the deployment properties
deployment.description = "New description"
# Save the changes
await client.update_deployment(deployment)
and I get this
prefect.exceptions.PrefectHTTPStatusError: Client error ‘422 Unprocessable Entity’Marvin
07/01/2025, 7:00 PMMarvin
07/01/2025, 7:01 PMpython
from prefect import get_client
from prefect.client.schemas import DeploymentUpdate
async with get_client() as client:
# First get the existing deployment
deployment = await client.read_deployment("your-deployment-id")
# Create a DeploymentUpdate object with your changes
deployment_update = DeploymentUpdate(
description="New description",
# Keep other fields you want to preserve
name=deployment.name,
flow_id=deployment.flow_id,
)
# Update the deployment
updated_deployment = await client.update_deployment(
"your-deployment-id",
deployment_update
)
3. If you only want to update specific fields, you can use a more targeted approach:
python
async with get_client() as client:
updated_deployment = await client.update_deployment(
"your-deployment-id",
DeploymentUpdate(description="New description")
)
The key differences from your original code are:
- Using DeploymentUpdate
instead of modifying the deployment object directly
- Passing both the deployment ID and the update object to update_deployment
If you're still getting a 422 error, can you share:
1. The complete error message (it should include details about which fields are invalid)
2. The actual deployment ID you're using (with sensitive parts redacted if necessary)
This will help identify exactly what's causing the validation error.Giacomo Chiarella
07/02/2025, 9:46 AMMarvin
07/02/2025, 9:46 AMMarvin
07/02/2025, 9:48 AMpython
from prefect.client import get_client
from prefect.client.schemas.filters import (
TaskRunFilter,
TaskRunFilterStateType,
TaskRunFilterTags,
)
async def get_filtered_task_runs():
client = get_client()
# Create the filter
task_filter = TaskRunFilter(
tags=TaskRunFilterTags(all_=["my_tag"]),
state_type=TaskRunFilterStateType(
any_=[
"RUNNING",
"PENDING",
"CANCELLING",
"SCHEDULED"
]
)
)
# Query the task runs
task_runs = await client.read_task_runs(task_filter)
return task_runs
# Use in an async context
task_runs = await get_filtered_task_runs()
A few important notes:
1. The code needs to run in an async context since the client methods are asynchronous
2. The all_
parameter in TaskRunFilterTags
ensures the task run has the specified tag
3. The any_
parameter in TaskRunFilterStateType
matches task runs that are in any of the specified states
4. State names should be uppercase strings as shown in the example
You can iterate through the returned task runs to access their properties:
python
for task_run in task_runs:
print(f"Task Run ID: {task_run.id}")
print(f"State: {task_run.state.type}")
print(f"Tags: {task_run.tags}")
The query will return all task runs that match both criteria (having the tag "my_tag" AND being in one of the specified states).Giacomo Chiarella
07/02/2025, 2:18 PMtask_filter = TaskRunFilter(
tags=TaskRunFilterTags(all_=[tag]),
state_type=TaskRunFilterStateType(
any_=[
StateType.RUNNING.value,
StateType.PENDING.value,
StateType.CANCELLING.value,
StateType.SCHEDULED.value,
StateType.PAUSED.value
]
)
)
I want to AND the conditionsMarvin
07/02/2025, 2:18 PMMarvin
07/02/2025, 2:19 PMTaskRunFilterState
instead of TaskRunFilterStateType
. Here's the correct way to combine the conditions with AND logic:
python
from prefect.client import get_client
from prefect.client.schemas.filters import (
TaskRunFilter,
TaskRunFilterTags,
TaskRunFilterState,
)
from prefect.states import State, StateType
async def get_filtered_task_runs(tag: str):
client = get_client()
# Create the filter
task_filter = TaskRunFilter(
tags=TaskRunFilterTags(all_=[tag]),
state=TaskRunFilterState(
type=TaskRunFilterStateType(
any_=[
StateType.RUNNING,
StateType.PENDING,
StateType.CANCELLING,
StateType.SCHEDULED,
StateType.PAUSED,
]
)
)
)
# Query the task runs
task_runs = await client.read_task_runs(task_filter)
return task_runs
The key differences in this corrected version are:
1. We use TaskRunFilterState
instead of directly using TaskRunFilterStateType
2. The state type filter is nested within the state
parameter
3. We use StateType
enum values directly instead of their string values
This will properly return task runs that satisfy BOTH conditions:
- Have the specified tag AND
- Are in one of the specified states
The filter conditions are combined with AND logic by default in the TaskRunFilter
, so this will give you exactly what you're looking for - task runs that match both criteria simultaneously.Giacomo Chiarella
07/03/2025, 3:46 PMtask_filter = TaskRunFilter(
tags=TaskRunFilterTags(all_=[tag]),
state=TaskRunFilterState(
type=TaskRunFilterStateType(
any_=[
StateType.RUNNING.value,
StateType.PENDING.value,
StateType.CANCELLING.value,
StateType.SCHEDULED.value,
StateType.PAUSED.value
]
)
)
)
task_runs = await client.read_task_runs(task_run_filter=task_filter, sort=TaskRunSort.NAME_ASC, limit=limit,
offset=offset)
but I’ve got
pydantic_core._pydantic_core.ValidationError: 1 validation error for TaskRunFilterState
name
Field required [type=missing, input_value={'type': TaskRunFilterSta...LED, StateType.PAUSED])}, input_type=dict]
Marvin
07/03/2025, 3:46 PMMarvin
07/03/2025, 3:47 PMstate_type
directly in the TaskRunFilter
instead of nesting it under state
. Here's the corrected version:
python
task_filter = TaskRunFilter(
tags=TaskRunFilterTags(all_=[tag]),
state_type=TaskRunFilterStateType(
any_=[
StateType.RUNNING.value,
StateType.PENDING.value,
StateType.CANCELLING.value,
StateType.SCHEDULED.value,
StateType.PAUSED.value
]
)
)
task_runs = await client.read_task_runs(
task_run_filter=task_filter,
sort=TaskRunSort.NAME_ASC,
limit=limit,
offset=offset
)
The key change is that we're using state_type
directly in the TaskRunFilter
instead of trying to nest it under a state
parameter. This should resolve the validation error you're seeing while still maintaining the AND logic between the tags and state type conditions.Giacomo Chiarella
07/03/2025, 3:48 PMtask_filter = TaskRunFilter(
tags=TaskRunFilterTags(all_=[tag]),
state=TaskRunFilterState(
type=TaskRunFilterStateType(
any_=[
StateType.RUNNING.value,
StateType.PENDING.value,
StateType.CANCELLING.value,
StateType.SCHEDULED.value,
StateType.PAUSED.value
]
)
)
)
task_runs = await client.read_task_runs(task_run_filter=task_filter, sort=TaskRunSort.NAME_ASC, limit=limit,
offset=offset)
but I’ve got
pydantic_core._pydantic_core.ValidationError: 1 validation error for TaskRunFilterState
name
Field required [type=missing, input_value={'type': TaskRunFilterSta...LED, StateType.PAUSED])}, input_type=dict]
there is no state_type nowhereMarvin
07/03/2025, 3:48 PMMarvin
07/03/2025, 3:48 PMTaskRunFilterState
requires a name
field. Let's modify the filter to include both the state type and name:
python
task_filter = TaskRunFilter(
tags=TaskRunFilterTags(all_=[tag]),
state=TaskRunFilterState(
name=TaskRunFilterStateName(
any_=[
StateType.RUNNING.value,
StateType.PENDING.value,
StateType.CANCELLING.value,
StateType.SCHEDULED.value,
StateType.PAUSED.value
]
),
type=TaskRunFilterStateType(
any_=[
StateType.RUNNING.value,
StateType.PENDING.value,
StateType.CANCELLING.value,
StateType.SCHEDULED.value,
StateType.PAUSED.value
]
)
)
)
task_runs = await client.read_task_runs(
task_run_filter=task_filter,
sort=TaskRunSort.NAME_ASC,
limit=limit,
offset=offset
)
Or, as suggested in my previous response, you can use the simpler form by filtering directly on `state_type`:
python
task_filter = TaskRunFilter(
tags=TaskRunFilterTags(all_=[tag]),
state_type=TaskRunFilterStateType(
any_=[
StateType.RUNNING.value,
StateType.PENDING.value,
StateType.CANCELLING.value,
StateType.SCHEDULED.value,
StateType.PAUSED.value
]
)
)
task_runs = await client.read_task_runs(
task_run_filter=task_filter,
sort=TaskRunSort.NAME_ASC,
limit=limit,
offset=offset
)
The second approach (using state_type
directly) is simpler and should achieve what you want - finding task runs that have the specified tag AND are in any of the specified states.Giacomo Chiarella
07/03/2025, 3:50 PMMarvin
07/03/2025, 3:50 PMMarvin
07/03/2025, 3:51 PMpython
from prefect.client import get_client
from prefect.client.schemas.filters import (
TaskRunFilter,
TaskRunFilterTags,
TaskRunFilterState,
TaskRunFilterStateName
)
from prefect.states import StateType
async def get_filtered_task_runs(tag: str):
client = get_client()
task_filter = TaskRunFilter(
tags=TaskRunFilterTags(all_=[tag]),
state=TaskRunFilterState(
name=TaskRunFilterStateName(
any_=[
"RUNNING",
"PENDING",
"CANCELLING",
"SCHEDULED",
"PAUSED"
]
)
)
)
task_runs = await client.read_task_runs(
task_run_filter=task_filter,
sort=TaskRunSort.NAME_ASC,
limit=limit,
offset=offset
)
return task_runs
The key changes are:
1. We're using just the state names directly as strings
2. We're using TaskRunFilterStateName
instead of TaskRunFilterStateType
3. The state names should be uppercase strings
This should work with Prefect 3.2.14. Let me know if you're still seeing any errors with this approach.Giacomo Chiarella
07/05/2025, 1:21 PMMarvin
07/05/2025, 1:21 PMMarvin
07/05/2025, 1:22 PMbash
prefect deployment run 'flow-name/deployment-name'
2. Run with custom parameters:
bash
prefect deployment run 'flow-name/deployment-name' \
--param param1=value1 \
--param param2=value2 \
--json '{"param3": "value3"}' \
--scheduled-time "2024-01-01T00:00:00Z"
Key options you can use:
- `--param`: Specify individual parameters
- `--json`: Pass all parameters as a JSON string
- `--scheduled-time`: Schedule the flow run for a specific time
- `--params-file`: Load parameters from a JSON or YAML file
- `--name`: Give the flow run a custom name
For example:
bash
# Simple run
prefect deployment run 'etl-flow/daily-run'
# Run with parameters
prefect deployment run 'etl-flow/daily-run' \
--param date="2024-01-01" \
--param batch_size=1000
# Run with scheduled time
prefect deployment run 'etl-flow/daily-run' \
--scheduled-time "2024-01-01T00:00:00Z"
# Run with parameters from file
prefect deployment run 'etl-flow/daily-run' \
--params-file params.json
Note: Make sure to use the correct format for the deployment name: flow-name/deployment-name
. You can see all available deployments by running prefect deployment ls
.Giacomo Chiarella
08/07/2025, 8:16 AMEncountered exception during execution: ObjectNotFound(None)
Traceback (most recent call last):
File "/usr/local/lib/python3.10/site-packages/prefect/client/orchestration/__init__.py", line 843, in read_task_run
response = await self._client.get(f"/task_runs/{task_run_id}")
File "/usr/local/lib/python3.10/site-packages/httpx/_client.py", line 1768, in get
return await self.request(
File "/usr/local/lib/python3.10/site-packages/httpx/_client.py", line 1540, in request
return await self.send(request, auth=auth, follow_redirects=follow_redirects)
File "/usr/local/lib/python3.10/site-packages/prefect/client/base.py", line 354, in send
response.raise_for_status()
File "/usr/local/lib/python3.10/site-packages/prefect/client/base.py", line 162, in raise_for_status
raise PrefectHTTPStatusError.from_httpx_error(exc) from exc.__cause__
prefect.exceptions.PrefectHTTPStatusError: Client error '404 Not Found' for url '<http://prefect_orion:4200/api/task_runs/fe75a8a6-5330-4648-88f1-d4525dfb1782>'
Response: {'detail': 'Task not found'}
For more information check: <https://developer.mozilla.org/en-US/docs/Web/HTTP/Status/404>
The above exception was the direct cause of the following exception:
Traceback (most recent call last):
File "/usr/local/lib/python3.10/site-packages/prefect/flow_engine.py", line 767, in run_context
yield self
File "/usr/local/lib/python3.10/site-packages/prefect/flow_engine.py", line 1379, in run_flow_sync
engine.call_flow_fn()
File "/usr/local/lib/python3.10/site-packages/prefect/flow_engine.py", line 787, in call_flow_fn
result = call_with_parameters(self.flow.fn, self.parameters)
File "/usr/local/lib/python3.10/site-packages/prefect/utilities/callables.py", line 208, in call_with_parameters
return fn(*args, **kwargs)
File "/tmp/tmp9bwmbsn2prefect/my_bucket/etl/ecs_cpu_utilization.py", line 376, in flow_entrypoint
main(dag_name=DAG_NAME, default_config=DEFAULT_CONFIG, datetime_ftm=DATETIME_FTM,
File "/usr/local/lib/python3.10/site-packages/dags/prod/common/dag.py", line 1196, in main
calculate_flow_state(dag_name, prefect_futures)
File "/usr/local/lib/python3.10/site-packages/dags/prod/common/dag.py", line 667, in calculate_flow_state
task_info = pack_task_info(prefect_futures=prefect_futures)
File "/usr/local/lib/python3.10/site-packages/dags/prod/common/dag.py", line 655, in pack_task_info
asyncio.run(wait(prefect_future))
File "/usr/local/lib/python3.10/asyncio/runners.py", line 44, in run
return loop.run_until_complete(main)
File "/usr/local/lib/python3.10/asyncio/base_events.py", line 649, in run_until_complete
return future.result()
File "/usr/local/lib/python3.10/site-packages/dags/prod/common/dag.py", line 641, in wait
task_run_name = await get_task_run_name_by_id(future.task_run_id, retries, retry_delay_seconds)
File "/usr/local/lib/python3.10/site-packages/dags/prod/common/dag.py", line 634, in get_task_run_name_by_id
raise e
File "/usr/local/lib/python3.10/site-packages/dags/prod/common/dag.py", line 630, in get_task_run_name_by_id
task_run = await client.read_task_run(task_run_id)
File "/usr/local/lib/python3.10/site-packages/prefect/client/orchestration/__init__.py", line 847, in read_task_run
raise prefect.exceptions.ObjectNotFound(http_exc=e) from e
prefect.exceptions.ObjectNotFound: None
the function get_task_run_name_by_id retrieves the task run name by its id and the id existsMarvin
08/07/2025, 8:16 AMMarvin
08/07/2025, 8:17 AMpython
from prefect.utilities.asyncutils import sync_compatible
from asyncio import sleep
from prefect import get_client
@sync_compatible
async def get_task_run_name_by_id(task_run_id, max_retries=3, delay_seconds=1):
client = get_client()
for attempt in range(max_retries):
try:
task_run = await client.read_task_run(task_run_id)
return task_run.name
except prefect.exceptions.ObjectNotFound:
if attempt == max_retries - 1:
raise
await sleep(delay_seconds)
2. Verify Server Connection: Check that your Prefect server is running and accessible:
bash
prefect server status
3. Check Task Run Status: Before attempting to access the task run, verify it exists and is in the expected state using the Prefect UI or CLI.
If you're still experiencing issues, it would be helpful to know:
1. Are you running Prefect in Docker?
2. Can you access the Prefect UI and see the task run there?
3. Does this happen consistently or intermittently?Giacomo Chiarella
08/18/2025, 1:52 PMMarvin
08/18/2025, 1:52 PMMarvin
08/18/2025, 1:55 PMVariable.get
from the Prefect SDK.
Basic read:
from prefect.variables import Variable
value = Variable.get("MY_VAR") # raises if not set
Provide a default to avoid errors:
value = Variable.get("MY_VAR", default="fallback")
If you want to handle the missing case explicitly:
from prefect.exceptions import MissingVariableError
try:
value = Variable.get("MY_VAR")
except MissingVariableError:
value = "fallback"
Note: Variables are for non-sensitive config. Don’t use them for secrets.
Docs:
- Concepts: Variables in Prefect 3
- Python API: prefect.variables APIGiacomo Chiarella
08/18/2025, 1:57 PMMarvin
08/18/2025, 1:57 PMMarvin
08/18/2025, 1:57 PMfrom prefect.variables import Variable
from prefect.exceptions import MissingVariableError
def get_or_create_variable(name: str, default):
try:
return Variable.get(name)
except MissingVariableError:
Variable.set(name, default, overwrite=False)
return default
value = get_or_create_variable("XYZ", "WHAT")
Notes:
- Variable.get("XYZ")
raises MissingVariableError
if it doesn’t exist; providing default=
will not create it in Prefect.
- Variable values must be JSON-serializable, not necessarily a JSON string. Python types like str, int, float, bool, None, dict, and list are fine. For example:
- Variable.set("XYZ", {"a": 1})
is valid.
- Variable.get("XYZ")
will return {"a": 1}
.
Docs:
- Concepts: Variables in Prefect 3
- Python API: prefect.variables APIGiacomo Chiarella
08/18/2025, 2:00 PMMarvin
08/18/2025, 2:00 PMMarvin
08/18/2025, 2:01 PMObjectNotFound
, not MissingVariableError
(that was 2.x).
Here’s a “get or create” that works in 3.x:
from prefect.variables import Variable
from prefect.exceptions import ObjectNotFound
def get_or_create_variable(name: str, default):
try:
return Variable.get(name)
except ObjectNotFound:
Variable.set(name, default, overwrite=False)
return default
value = get_or_create_variable("XYZ", "WHAT")
Notes:
- Variable values must be JSON-serializable (str, int, float, bool, None, list, dict, etc.). You don’t need to pass a JSON string; plain Python types are fine.
- default=
on Variable.get
only returns the default; it does not create the variable in Prefect.
Docs:
- Concepts: Variables in Prefect 3
- API: prefect.variables APIGiacomo Chiarella
08/18/2025, 3:43 PMMarvin
08/18/2025, 3:43 PMMarvin
08/18/2025, 3:44 PMVariable.get
and Variable.set
are synchronous in Prefect 3.x, so your get_or_create_variable
is a normal function and does not need await
.
Example in a flow:
from prefect import flow
from prefect.variables import Variable
from prefect.exceptions import ObjectNotFound
def get_or_create_variable(name: str, default):
try:
return Variable.get(name)
except ObjectNotFound:
Variable.set(name, default, overwrite=False)
return default
@flow
def my_flow():
v = get_or_create_variable("XYZ", "WHAT")
print(v)
my_flow()
If you wrote an async flow and want to be extra polite to the event loop, you can offload the sync call:
import anyio
v = await anyio.to_thread.run_sync(get_or_create_variable, "XYZ", "WHAT")
Docs:
- Concepts: Variables in Prefect 3
- API: prefect.variables APIGiacomo Chiarella
08/18/2025, 7:09 PMobject Variable can't be used in 'await' expression
Marvin
08/18/2025, 7:09 PMMarvin
08/18/2025, 7:10 PMawait
something that isn’t awaitable. Prefect variables are synchronous.
Do this (no await):
from prefect.variables import Variable
value = Variable.get("XYZ", default="WHAT") # does not create it
If you want “get or create”:
from prefect.variables import Variable
from prefect.exceptions import ObjectNotFound
def get_or_create_variable(name: str, default):
try:
return Variable.get(name)
except ObjectNotFound:
Variable.set(name, default, overwrite=False)
return default
v = get_or_create_variable("XYZ", "WHAT") # no await
In an async flow, still no await:
from prefect import flow
@flow
async def my_flow():
v = get_or_create_variable("XYZ", "WHAT") # sync call is fine
If you accidentally wrote async def get_or_create_variable(...):
, change it to a plain def
.
Docs:
- Variables in Prefect 3
- prefect.variables APIGiacomo Chiarella
08/18/2025, 7:11 PMdef get_variable(name: str, default):
try:
return Variable.get(name)
except:
Variable.set(name, default, overwrite=True)
return default
from within a prefect task. I’m calling it like
get_variable("run_spark", True)
I expect to get True and have run_spark variable created, instead I get None and the variable is not createdMarvin
08/18/2025, 7:12 PMMarvin
08/18/2025, 7:13 PMNone
because Variable.get
doesn’t raise when missing in Prefect 3.x — it returns the default
you pass, and since the default parameter itself defaults to None
, you got exactly that. Your except
never ran, so nothing was created.
Fix by using a sentinel so you can detect “missing” and then create:
from prefect.variables import Variable
_MISSING = object()
def get_or_create_variable(name: str, default):
value = Variable.get(name, default=_MISSING)
if value is _MISSING:
# Only create if it doesn't exist
Variable.set(name, default, overwrite=False)
return default
return value
# usage
v = get_or_create_variable("run_spark", True)
Notes:
- Prefer overwrite=False
so you don’t clobber an existing value by accident.
- Values must be JSON-serializable (bools like True
are fine).
- No await
needed; these calls are synchronous.
Verify it was created:
prefect variable get run_spark
# or
prefect variable inspect run_spark
Docs:
- Concepts: Variables in Prefect 3
- API: prefect.variables APIBring your towel and join one of the fastest growing data communities. Welcome to our second-generation open source orchestration platform, a completely rethought approach to dataflow automation.
Powered by