<@ULVA73B9P> the endpoint patch /api/deployments/{id} returns {‘detail’: ‘Method Not Allowed’}, why?
g
@Marvin the endpoint patch /api/deployments/{id} returns {‘detail’: ‘Method Not Allowed’}, why?
m
thought for 50.7 seconds
The "Method Not Allowed" error you're seeing suggests that PATCH operations are not supported for the
/api/deployments/{id}
endpoint. This is actually expected behavior - in Prefect 3.x, deployment updates should be handled through the Python client rather than direct API calls. Instead of using a PATCH request, you should: 1. Use the Python client to update deployments:
Copy code
python
from prefect import get_client

async with get_client() as client:
    # Fetch the deployment
    deployment = await client.read_deployment("deployment-id")
    
    # Update the deployment properties
    deployment.description = "New description"
    
    # Save the changes
    await client.update_deployment(deployment)
2. Or use the CLI:
Copy code
bash
prefect deployment update my-deployment-name
If you're working on automation that requires HTTP API access, you might want to look into using the Python client in your automation code instead, as it provides a more stable and supported interface for deployment management. The direct REST API endpoints are primarily used by the Prefect UI and the Python client internally. For the most reliable experience, stick to the Python client or CLI for deployment management operations.
g
@Marvin calling await client.read_deployments(limit=limit, offset=total_len, sort=DeploymentSort.NAME_ASC) returns json.decoder.JSONDecodeError: Expecting value: line 1 column 1 (char 0)
m
thought for 28.2 seconds
This JSONDecodeError typically occurs when the API request fails to return valid JSON, which often means there's either: 1. A connection issue to your Prefect server 2. An authentication problem 3. Invalid parameters in the request Here's how to troubleshoot: 1. First, verify your Prefect server is running and accessible:
Copy code
python
from prefect import get_client

async with get_client() as client:
    # Test basic connectivity
    try:
        await client.hello()
    except Exception as e:
        print(f"Connection error: {e}")
2. Check your API URL and authentication:
Copy code
python
import prefect.settings
print(prefect.settings.PREFECT_API_URL.value())
3. Try the read_deployments call with minimal parameters first:
Copy code
python
async with get_client() as client:
    try:
        deployments = await client.read_deployments()
        print(f"Found {len(deployments)} deployments")
    except Exception as e:
        print(f"Error: {e}")
4. If that works, add parameters incrementally to identify which one might be causing the issue. Make sure: -
limit
is a positive integer -
offset
is a non-negative integer -
total_len
is actually defined in your code before using it -
DeploymentSort.NAME_ASC
is properly imported from
prefect.client.schemas
Can you try these steps and let me know: 1. Are you able to connect to the server at all? 2. Does the basic
read_deployments()
call without parameters work? 3. What version of Prefect are you using? (
prefect version
) This will help narrow down the exact cause of the JSONDecodeError.
g
I’m doing this
Copy code
async with get_client() as client:
    # Fetch the deployment
    deployment = await client.read_deployment("deployment-id")
    
    # Update the deployment properties
    deployment.description = "New description"
    
    # Save the changes
    await client.update_deployment(deployment)
and I get this prefect.exceptions.PrefectHTTPStatusError: Client error ‘422 Unprocessable Entity’
@Marvin I’m doing this
Copy code
async with get_client() as client:
    # Fetch the deployment
    deployment = await client.read_deployment("deployment-id")
    
    # Update the deployment properties
    deployment.description = "New description"
    
    # Save the changes
    await client.update_deployment(deployment)
and I get this prefect.exceptions.PrefectHTTPStatusError: Client error ‘422 Unprocessable Entity’
m
thought for 38.6 seconds
The 422 Unprocessable Entity error typically occurs when the deployment update request contains invalid data. There are a few common issues to check: 1. Make sure you're using the actual deployment ID, not the name. The ID should look like a UUID, for example: "12345678-1234-5678-1234-567812345678" 2. When updating a deployment, you need to use a DeploymentUpdate object. Here's the correct way:
Copy code
python
from prefect import get_client
from prefect.client.schemas import DeploymentUpdate

async with get_client() as client:
    # First get the existing deployment
    deployment = await client.read_deployment("your-deployment-id")
    
    # Create a DeploymentUpdate object with your changes
    deployment_update = DeploymentUpdate(
        description="New description",
        # Keep other fields you want to preserve
        name=deployment.name,
        flow_id=deployment.flow_id,
    )
    
    # Update the deployment
    updated_deployment = await client.update_deployment(
        "your-deployment-id", 
        deployment_update
    )
3. If you only want to update specific fields, you can use a more targeted approach:
Copy code
python
async with get_client() as client:
    updated_deployment = await client.update_deployment(
        "your-deployment-id",
        DeploymentUpdate(description="New description")
    )
The key differences from your original code are: - Using
DeploymentUpdate
instead of modifying the deployment object directly - Passing both the deployment ID and the update object to
update_deployment
If you're still getting a 422 error, can you share: 1. The complete error message (it should include details about which fields are invalid) 2. The actual deployment ID you're using (with sensitive parts redacted if necessary) This will help identify exactly what's causing the validation error.
g
@Marvin how do I get all task runs having the tag “my_tag” and state is running, pending, cancelling or scheduled?
m
thought for 96.9 seconds
Here's how you can query task runs with the specified tag and states:
Copy code
python
from prefect.client import get_client
from prefect.client.schemas.filters import (
    TaskRunFilter,
    TaskRunFilterStateType,
    TaskRunFilterTags,
)

async def get_filtered_task_runs():
    client = get_client()
    
    # Create the filter
    task_filter = TaskRunFilter(
        tags=TaskRunFilterTags(all_=["my_tag"]),
        state_type=TaskRunFilterStateType(
            any_=[
                "RUNNING",
                "PENDING",
                "CANCELLING",
                "SCHEDULED"
            ]
        )
    )
    
    # Query the task runs
    task_runs = await client.read_task_runs(task_filter)
    return task_runs

# Use in an async context
task_runs = await get_filtered_task_runs()
A few important notes: 1. The code needs to run in an async context since the client methods are asynchronous 2. The
all_
parameter in
TaskRunFilterTags
ensures the task run has the specified tag 3. The
any_
parameter in
TaskRunFilterStateType
matches task runs that are in any of the specified states 4. State names should be uppercase strings as shown in the example You can iterate through the returned task runs to access their properties:
Copy code
python
for task_run in task_runs:
    print(f"Task Run ID: {task_run.id}")
    print(f"State: {task_run.state.type}")
    print(f"Tags: {task_run.tags}")
The query will return all task runs that match both criteria (having the tag "my_tag" AND being in one of the specified states).
g
@Marvin this filter gives me task runs with a specific tag OR a specific status
Copy code
task_filter = TaskRunFilter(
    tags=TaskRunFilterTags(all_=[tag]),
    state_type=TaskRunFilterStateType(
        any_=[
            StateType.RUNNING.value,
            StateType.PENDING.value,
            StateType.CANCELLING.value,
            StateType.SCHEDULED.value,
            StateType.PAUSED.value
        ]
    )
)
I want to AND the conditions
m
thought for 39.6 seconds
The issue is that we need to use
TaskRunFilterState
instead of
TaskRunFilterStateType
. Here's the correct way to combine the conditions with AND logic:
Copy code
python
from prefect.client import get_client
from prefect.client.schemas.filters import (
    TaskRunFilter,
    TaskRunFilterTags,
    TaskRunFilterState,
)
from prefect.states import State, StateType

async def get_filtered_task_runs(tag: str):
    client = get_client()
    
    # Create the filter
    task_filter = TaskRunFilter(
        tags=TaskRunFilterTags(all_=[tag]),
        state=TaskRunFilterState(
            type=TaskRunFilterStateType(
                any_=[
                    StateType.RUNNING,
                    StateType.PENDING,
                    StateType.CANCELLING,
                    StateType.SCHEDULED,
                    StateType.PAUSED,
                ]
            )
        )
    )
    
    # Query the task runs
    task_runs = await client.read_task_runs(task_filter)
    return task_runs
The key differences in this corrected version are: 1. We use
TaskRunFilterState
instead of directly using
TaskRunFilterStateType
2. The state type filter is nested within the
state
parameter 3. We use
StateType
enum values directly instead of their string values This will properly return task runs that satisfy BOTH conditions: - Have the specified tag AND - Are in one of the specified states The filter conditions are combined with AND logic by default in the
TaskRunFilter
, so this will give you exactly what you're looking for - task runs that match both criteria simultaneously.
g
@Marvin I’ve run
Copy code
task_filter = TaskRunFilter(
            tags=TaskRunFilterTags(all_=[tag]),
            state=TaskRunFilterState(
                type=TaskRunFilterStateType(
                    any_=[
                        StateType.RUNNING.value,
                        StateType.PENDING.value,
                        StateType.CANCELLING.value,
                        StateType.SCHEDULED.value,
                        StateType.PAUSED.value
                    ]
                )
            )
        )
task_runs = await client.read_task_runs(task_run_filter=task_filter, sort=TaskRunSort.NAME_ASC, limit=limit,
                                        offset=offset)
but I’ve got
Copy code
pydantic_core._pydantic_core.ValidationError: 1 validation error for TaskRunFilterState
name
  Field required [type=missing, input_value={'type': TaskRunFilterSta...LED, StateType.PAUSED])}, input_type=dict]
m
thought for 45.9 seconds
I see the issue now. You can simplify this by using just
state_type
directly in the
TaskRunFilter
instead of nesting it under
state
. Here's the corrected version:
Copy code
python
task_filter = TaskRunFilter(
    tags=TaskRunFilterTags(all_=[tag]),
    state_type=TaskRunFilterStateType(
        any_=[
            StateType.RUNNING.value,
            StateType.PENDING.value,
            StateType.CANCELLING.value,
            StateType.SCHEDULED.value,
            StateType.PAUSED.value
        ]
    )
)

task_runs = await client.read_task_runs(
    task_run_filter=task_filter, 
    sort=TaskRunSort.NAME_ASC, 
    limit=limit,
    offset=offset
)
The key change is that we're using
state_type
directly in the
TaskRunFilter
instead of trying to nest it under a
state
parameter. This should resolve the validation error you're seeing while still maintaining the AND logic between the tags and state type conditions.
g
@Marvin I’ve run
Copy code
task_filter = TaskRunFilter(
            tags=TaskRunFilterTags(all_=[tag]),
            state=TaskRunFilterState(
                type=TaskRunFilterStateType(
                    any_=[
                        StateType.RUNNING.value,
                        StateType.PENDING.value,
                        StateType.CANCELLING.value,
                        StateType.SCHEDULED.value,
                        StateType.PAUSED.value
                    ]
                )
            )
        )
task_runs = await client.read_task_runs(task_run_filter=task_filter, sort=TaskRunSort.NAME_ASC, limit=limit,
                                        offset=offset)
but I’ve got
Copy code
pydantic_core._pydantic_core.ValidationError: 1 validation error for TaskRunFilterState
name
  Field required [type=missing, input_value={'type': TaskRunFilterSta...LED, StateType.PAUSED])}, input_type=dict]
there is no state_type nowhere
m
thought for 11.3 seconds
Ah, I see the issue now. According to the error,
TaskRunFilterState
requires a
name
field. Let's modify the filter to include both the state type and name:
Copy code
python
task_filter = TaskRunFilter(
    tags=TaskRunFilterTags(all_=[tag]),
    state=TaskRunFilterState(
        name=TaskRunFilterStateName(
            any_=[
                StateType.RUNNING.value,
                StateType.PENDING.value,
                StateType.CANCELLING.value,
                StateType.SCHEDULED.value,
                StateType.PAUSED.value
            ]
        ),
        type=TaskRunFilterStateType(
            any_=[
                StateType.RUNNING.value,
                StateType.PENDING.value,
                StateType.CANCELLING.value,
                StateType.SCHEDULED.value,
                StateType.PAUSED.value
            ]
        )
    )
)

task_runs = await client.read_task_runs(
    task_run_filter=task_filter, 
    sort=TaskRunSort.NAME_ASC, 
    limit=limit,
    offset=offset
)
Or, as suggested in my previous response, you can use the simpler form by filtering directly on `state_type`:
Copy code
python
task_filter = TaskRunFilter(
    tags=TaskRunFilterTags(all_=[tag]),
    state_type=TaskRunFilterStateType(
        any_=[
            StateType.RUNNING.value,
            StateType.PENDING.value,
            StateType.CANCELLING.value,
            StateType.SCHEDULED.value,
            StateType.PAUSED.value
        ]
    )
)

task_runs = await client.read_task_runs(
    task_run_filter=task_filter, 
    sort=TaskRunSort.NAME_ASC, 
    limit=limit,
    offset=offset
)
The second approach (using
state_type
directly) is simpler and should achieve what you want - finding task runs that have the specified tag AND are in any of the specified states.
g
@Marvin in PRefect 3.2.14 there is not state_type attribute, nowhere
m
thought for 65.9 seconds
For Prefect 3.2.14, try this approach:
Copy code
python
from prefect.client import get_client
from prefect.client.schemas.filters import (
    TaskRunFilter,
    TaskRunFilterTags,
    TaskRunFilterState,
    TaskRunFilterStateName
)
from prefect.states import StateType

async def get_filtered_task_runs(tag: str):
    client = get_client()
    
    task_filter = TaskRunFilter(
        tags=TaskRunFilterTags(all_=[tag]),
        state=TaskRunFilterState(
            name=TaskRunFilterStateName(
                any_=[
                    "RUNNING",
                    "PENDING",
                    "CANCELLING",
                    "SCHEDULED",
                    "PAUSED"
                ]
            )
        )
    )
    
    task_runs = await client.read_task_runs(
        task_run_filter=task_filter, 
        sort=TaskRunSort.NAME_ASC, 
        limit=limit,
        offset=offset
    )
    return task_runs
The key changes are: 1. We're using just the state names directly as strings 2. We're using
TaskRunFilterStateName
instead of
TaskRunFilterStateType
3. The state names should be uppercase strings This should work with Prefect 3.2.14. Let me know if you're still seeing any errors with this approach.
g
@Marvin how do I trigger a deployment using prefect cli?
m
thought for 40.9 seconds
To trigger a deployment using the Prefect CLI, you have two main options: 1. Quick run with default parameters:
Copy code
bash
prefect deployment run 'flow-name/deployment-name'
2. Run with custom parameters:
Copy code
bash
prefect deployment run 'flow-name/deployment-name' \
    --param param1=value1 \
    --param param2=value2 \
    --json '{"param3": "value3"}' \
    --scheduled-time "2024-01-01T00:00:00Z"
Key options you can use: - `--param`: Specify individual parameters - `--json`: Pass all parameters as a JSON string - `--scheduled-time`: Schedule the flow run for a specific time - `--params-file`: Load parameters from a JSON or YAML file - `--name`: Give the flow run a custom name For example:
Copy code
bash
# Simple run
prefect deployment run 'etl-flow/daily-run'

# Run with parameters
prefect deployment run 'etl-flow/daily-run' \
    --param date="2024-01-01" \
    --param batch_size=1000

# Run with scheduled time
prefect deployment run 'etl-flow/daily-run' \
    --scheduled-time "2024-01-01T00:00:00Z"

# Run with parameters from file
prefect deployment run 'etl-flow/daily-run' \
    --params-file params.json
Note: Make sure to use the correct format for the deployment name:
flow-name/deployment-name
. You can see all available deployments by running
prefect deployment ls
.
g
@Marvin what i this for?
Copy code
Encountered exception during execution: ObjectNotFound(None)
Traceback (most recent call last):
  File "/usr/local/lib/python3.10/site-packages/prefect/client/orchestration/__init__.py", line 843, in read_task_run
    response = await self._client.get(f"/task_runs/{task_run_id}")
  File "/usr/local/lib/python3.10/site-packages/httpx/_client.py", line 1768, in get
    return await self.request(
  File "/usr/local/lib/python3.10/site-packages/httpx/_client.py", line 1540, in request
    return await self.send(request, auth=auth, follow_redirects=follow_redirects)
  File "/usr/local/lib/python3.10/site-packages/prefect/client/base.py", line 354, in send
    response.raise_for_status()
  File "/usr/local/lib/python3.10/site-packages/prefect/client/base.py", line 162, in raise_for_status
    raise PrefectHTTPStatusError.from_httpx_error(exc) from exc.__cause__
prefect.exceptions.PrefectHTTPStatusError: Client error '404 Not Found' for url '<http://prefect_orion:4200/api/task_runs/fe75a8a6-5330-4648-88f1-d4525dfb1782>'

Response: {'detail': 'Task not found'}
For more information check: <https://developer.mozilla.org/en-US/docs/Web/HTTP/Status/404>


The above exception was the direct cause of the following exception:

Traceback (most recent call last):
  File "/usr/local/lib/python3.10/site-packages/prefect/flow_engine.py", line 767, in run_context
    yield self
  File "/usr/local/lib/python3.10/site-packages/prefect/flow_engine.py", line 1379, in run_flow_sync
    engine.call_flow_fn()
  File "/usr/local/lib/python3.10/site-packages/prefect/flow_engine.py", line 787, in call_flow_fn
    result = call_with_parameters(self.flow.fn, self.parameters)
  File "/usr/local/lib/python3.10/site-packages/prefect/utilities/callables.py", line 208, in call_with_parameters
    return fn(*args, **kwargs)
  File "/tmp/tmp9bwmbsn2prefect/my_bucket/etl/ecs_cpu_utilization.py", line 376, in flow_entrypoint
    main(dag_name=DAG_NAME, default_config=DEFAULT_CONFIG, datetime_ftm=DATETIME_FTM,
  File "/usr/local/lib/python3.10/site-packages/dags/prod/common/dag.py", line 1196, in main
    calculate_flow_state(dag_name, prefect_futures)
  File "/usr/local/lib/python3.10/site-packages/dags/prod/common/dag.py", line 667, in calculate_flow_state
    task_info = pack_task_info(prefect_futures=prefect_futures)
  File "/usr/local/lib/python3.10/site-packages/dags/prod/common/dag.py", line 655, in pack_task_info
    asyncio.run(wait(prefect_future))
  File "/usr/local/lib/python3.10/asyncio/runners.py", line 44, in run
    return loop.run_until_complete(main)
  File "/usr/local/lib/python3.10/asyncio/base_events.py", line 649, in run_until_complete
    return future.result()
  File "/usr/local/lib/python3.10/site-packages/dags/prod/common/dag.py", line 641, in wait
    task_run_name = await get_task_run_name_by_id(future.task_run_id, retries, retry_delay_seconds)
  File "/usr/local/lib/python3.10/site-packages/dags/prod/common/dag.py", line 634, in get_task_run_name_by_id
    raise e
  File "/usr/local/lib/python3.10/site-packages/dags/prod/common/dag.py", line 630, in get_task_run_name_by_id
    task_run = await client.read_task_run(task_run_id)
  File "/usr/local/lib/python3.10/site-packages/prefect/client/orchestration/__init__.py", line 847, in read_task_run
    raise prefect.exceptions.ObjectNotFound(http_exc=e) from e
prefect.exceptions.ObjectNotFound: None
the function get_task_run_name_by_id retrieves the task run name by its id and the id exists
m
thought for 15.7 seconds
Based on your error message and the research, this issue typically occurs in a few scenarios: 1. Race Condition: The task run was deleted or cleaned up before your code tried to access it. This can happen if: - The flow run was manually terminated - The task run was deleted while the flow was still running - There was a cleanup operation that removed the task run 2. Network/Configuration Issues: The Prefect server can't be reached or is misconfigured: - The URL in your error (`http://prefect_orion:4200`) suggests you're using a Docker setup - There might be connectivity issues between your client and the Prefect server 3. Timing Issues: If you're trying to access the task run information immediately after it's created or completed, there might be a race condition where the task run hasn't been fully registered in the database yet. To resolve this, you could try: 1. Add Retry Logic: If you're writing custom code to fetch task run information, add retry logic with a small delay:
Copy code
python
from prefect.utilities.asyncutils import sync_compatible
from asyncio import sleep
from prefect import get_client

@sync_compatible
async def get_task_run_name_by_id(task_run_id, max_retries=3, delay_seconds=1):
    client = get_client()
    for attempt in range(max_retries):
        try:
            task_run = await client.read_task_run(task_run_id)
            return task_run.name
        except prefect.exceptions.ObjectNotFound:
            if attempt == max_retries - 1:
                raise
            await sleep(delay_seconds)
2. Verify Server Connection: Check that your Prefect server is running and accessible:
Copy code
bash
prefect server status
3. Check Task Run Status: Before attempting to access the task run, verify it exists and is in the expected state using the Prefect UI or CLI. If you're still experiencing issues, it would be helpful to know: 1. Are you running Prefect in Docker? 2. Can you access the Prefect UI and see the task run there? 3. Does this happen consistently or intermittently?
g
@Marvin how do I read a variable using prefect python sdk?
m
thought for 155.8 seconds
Use
Variable.get
from the Prefect SDK. Basic read:
Copy code
from prefect.variables import Variable

value = Variable.get("MY_VAR")  # raises if not set
Provide a default to avoid errors:
Copy code
value = Variable.get("MY_VAR", default="fallback")
If you want to handle the missing case explicitly:
Copy code
from prefect.exceptions import MissingVariableError

try:
    value = Variable.get("MY_VAR")
except MissingVariableError:
    value = "fallback"
Note: Variables are for non-sensitive config. Don’t use them for secrets. Docs: - Concepts: Variables in Prefect 3 - Python API: prefect.variables API
g
@Marvin I want to read a variable called XYZ. If the variable does not exist I want to write it with a default value WHAT. I’ve seen also that a variable value must be a json
m
thought for 18.2 seconds
You can “get or create” it like this:
Copy code
from prefect.variables import Variable
from prefect.exceptions import MissingVariableError

def get_or_create_variable(name: str, default):
    try:
        return Variable.get(name)
    except MissingVariableError:
        Variable.set(name, default, overwrite=False)
        return default

value = get_or_create_variable("XYZ", "WHAT")
Notes: -
Variable.get("XYZ")
raises
MissingVariableError
if it doesn’t exist; providing
default=
will not create it in Prefect. - Variable values must be JSON-serializable, not necessarily a JSON string. Python types like str, int, float, bool, None, dict, and list are fine. For example: -
Variable.set("XYZ", {"a": 1})
is valid. -
Variable.get("XYZ")
will return
{"a": 1}
. Docs: - Concepts: Variables in Prefect 3 - Python API: prefect.variables API
g
@Marvin MissingVariableError does not exist
m
thought for 25.1 seconds
Right, my bad — in Prefect 3.x the exception you want is
ObjectNotFound
, not
MissingVariableError
(that was 2.x). Here’s a “get or create” that works in 3.x:
Copy code
from prefect.variables import Variable
from prefect.exceptions import ObjectNotFound

def get_or_create_variable(name: str, default):
    try:
        return Variable.get(name)
    except ObjectNotFound:
        Variable.set(name, default, overwrite=False)
        return default

value = get_or_create_variable("XYZ", "WHAT")
Notes: - Variable values must be JSON-serializable (str, int, float, bool, None, list, dict, etc.). You don’t need to pass a JSON string; plain Python types are fine. -
default=
on
Variable.get
only returns the default; it does not create the variable in Prefect. Docs: - Concepts: Variables in Prefect 3 - API: prefect.variables API
g
@Marvin does get_or_create_variable function need await to be called?
m
thought for 27.8 seconds
Short answer: no.
Variable.get
and
Variable.set
are synchronous in Prefect 3.x, so your
get_or_create_variable
is a normal function and does not need
await
. Example in a flow:
Copy code
from prefect import flow
from prefect.variables import Variable
from prefect.exceptions import ObjectNotFound

def get_or_create_variable(name: str, default):
    try:
        return Variable.get(name)
    except ObjectNotFound:
        Variable.set(name, default, overwrite=False)
        return default

@flow
def my_flow():
    v = get_or_create_variable("XYZ", "WHAT")
    print(v)

my_flow()
If you wrote an async flow and want to be extra polite to the event loop, you can offload the sync call:
Copy code
import anyio

v = await anyio.to_thread.run_sync(get_or_create_variable, "XYZ", "WHAT")
Docs: - Concepts: Variables in Prefect 3 - API: prefect.variables API
g
@Marvin
Copy code
object Variable can't be used in 'await' expression
m
thought for 22.2 seconds
That error means you’re trying to
await
something that isn’t awaitable. Prefect variables are synchronous. Do this (no await):
Copy code
from prefect.variables import Variable

value = Variable.get("XYZ", default="WHAT")  # does not create it
If you want “get or create”:
Copy code
from prefect.variables import Variable
from prefect.exceptions import ObjectNotFound

def get_or_create_variable(name: str, default):
    try:
        return Variable.get(name)
    except ObjectNotFound:
        Variable.set(name, default, overwrite=False)
        return default

v = get_or_create_variable("XYZ", "WHAT")  # no await
In an async flow, still no await:
Copy code
from prefect import flow

@flow
async def my_flow():
    v = get_or_create_variable("XYZ", "WHAT")  # sync call is fine
If you accidentally wrote
async def get_or_create_variable(...):
, change it to a plain
def
. Docs: - Variables in Prefect 3 - prefect.variables API
g
@Marvin I’m calling this function
Copy code
def get_variable(name: str, default):
    try:
        return Variable.get(name)
    except:
        Variable.set(name, default, overwrite=True)
        return default
from within a prefect task. I’m calling it like
Copy code
get_variable("run_spark", True)
I expect to get True and have run_spark variable created, instead I get None and the variable is not created
m
thought for 115.8 seconds
You’re returning
None
because
Variable.get
doesn’t raise when missing in Prefect 3.x — it returns the
default
you pass, and since the default parameter itself defaults to
None
, you got exactly that. Your
except
never ran, so nothing was created. Fix by using a sentinel so you can detect “missing” and then create:
Copy code
from prefect.variables import Variable

_MISSING = object()

def get_or_create_variable(name: str, default):
    value = Variable.get(name, default=_MISSING)
    if value is _MISSING:
        # Only create if it doesn't exist
        Variable.set(name, default, overwrite=False)
        return default
    return value

# usage
v = get_or_create_variable("run_spark", True)
Notes: - Prefer
overwrite=False
so you don’t clobber an existing value by accident. - Values must be JSON-serializable (bools like
True
are fine). - No
await
needed; these calls are synchronous. Verify it was created:
Copy code
prefect variable get run_spark
# or
prefect variable inspect run_spark
Docs: - Concepts: Variables in Prefect 3 - API: prefect.variables API