Tu Vu
08/04/2025, 8:15 AMflow_run = await client.create_flow_run_from_deployment(
deployment.id,
parameters=parameters or {}
)
File "/service/prefect_service.py", line 30, in run_deployment
flow_run = await client.create_flow_run_from_deployment(
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/.venv/lib/python3.11/site-packages/prefect/client/orchestration/_deployments/client.py", line 1328, in create_flow_run_from_deployment
state=to_state_create(state),
^^^^^^^^^^^^^^^^^^^^^^
File "/.venv/lib/python3.11/site-packages/prefect/states.py", line 65, in to_state_create
return StateCreate(
^^^^^^^^^^^^
File "/.venv/lib/python3.11/site-packages/pydantic/main.py", line 253, in __init__
validated_self = self.__pydantic_validator__.validate_python(data, self_instance=self)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/.venv/lib/python3.11/site-packages/pydantic/_internal/_mock_val_ser.py", line 100, in __getattr__
raise PydanticUserError(self._error_message, code=self._code)
PydanticUserError: `StateCreate` is not fully defined; you should define all referenced types, then call `StateCreate.model_rebuild()`.
Marvin
08/04/2025, 8:15 AMMarvin
08/04/2025, 8:16 AMprefect
module first:
python
from prefect import flow # or any other prefect import
from prefect.client import get_client # your other imports
2. Alternative Solution: If you're creating a state directly, use a simpler state creation approach:
python
from prefect.states import Scheduled
# Create scheduled state without complex data structures
state = Scheduled(scheduled_time=datetime.now(timezone.utc))
The error occurs because of Pydantic's validation system and some circular import dependencies in Prefect. The error message StateCreate is not fully defined
happens when the Pydantic models haven't been fully initialized due to the order of imports.
This issue has been documented in Issue #15957 and Issue #15646.
To fix your specific code, modify it to ensure you have a top-level import from the main prefect
module before using the client:
python
from prefect import flow # Add this import first
from prefect.client import get_client
# Your existing code
flow_run = await client.create_flow_run_from_deployment(
deployment.id,
parameters=parameters or {}
)
If you're still experiencing issues after trying these solutions, please let me know and share:
1. Your complete import statements
2. The Prefect version you're using (prefect version
)Tu Vu
08/04/2025, 8:32 AMMarvin
08/04/2025, 8:32 AMMarvin
08/04/2025, 8:33 AMread_flow_runs
to filter by deployment and parameters in Prefect 3.x:
python
from prefect.client import get_client
from prefect.client.schemas.filters import FlowRunFilter, FlowRunFilterDeploymentId
async with get_client() as client:
# Filter by deployment ID
flow_runs = await client.read_flow_runs(
flow_run_filter=FlowRunFilter(
deployment_id=FlowRunFilterDeploymentId(any_=["your-deployment-id"])
)
)
For parameters, you'll need to filter the results programmatically as parameters are stored as part of the flow run data. Here's a complete example:
python
from prefect.client import get_client
from prefect.client.schemas.filters import FlowRunFilter, FlowRunFilterDeploymentId
async def get_flow_runs_by_deployment_and_params(
deployment_id: str,
parameter_filters: dict
):
async with get_client() as client:
# First filter by deployment
flow_runs = await client.read_flow_runs(
flow_run_filter=FlowRunFilter(
deployment_id=FlowRunFilterDeploymentId(any_=[deployment_id])
)
)
# Then filter by parameters
filtered_runs = [
run for run in flow_runs
if all(
run.parameters.get(key) == value
for key, value in parameter_filters.items()
)
]
return filtered_runs
# Usage example:
deployment_id = "your-deployment-id"
params_to_filter = {"param1": "value1", "param2": "value2"}
matching_runs = await get_flow_runs_by_deployment_and_params(
deployment_id,
params_to_filter
)
Additional filtering options available include:
- FlowRunFilterState
- Filter by state type or name
- FlowRunFilterStartTime
- Filter by when the flow run started
- FlowRunFilterTags
- Filter by tags
- FlowRunFilterName
- Filter by flow run name
For example, to add state filtering:
python
from prefect.client.schemas.filters import FlowRunFilterState
flow_runs = await client.read_flow_runs(
flow_run_filter=FlowRunFilter(
deployment_id=FlowRunFilterDeploymentId(any_=[deployment_id]),
state=FlowRunFilterState(
type={"any_": ["COMPLETED"]} # or "FAILED", "PENDING", etc.
)
)
)
Note that the filters are combined with AND logic, so all conditions must be met for a flow run to be included in the results.
Let me know if you need help with any specific filtering scenarios or if you'd like to see examples of other filter combinations!Tu Vu
08/04/2025, 8:49 AMMarvin
08/04/2025, 8:49 AMMarvin
08/04/2025, 8:49 AMFlowRunFilterId
filter. Here's how to do it:
python
from prefect.client import get_client
from prefect.client.schemas.filters import FlowRunFilter, FlowRunFilterId
async def get_flow_runs_by_ids(flow_run_ids: list[str]):
async with get_client() as client:
flow_runs = await client.read_flow_runs(
flow_run_filter=FlowRunFilter(
id=FlowRunFilterId(any_=flow_run_ids)
)
)
return flow_runs
# Usage example:
flow_run_ids = ["id1", "id2", "id3"] # Your list of flow run IDs
flow_runs = await get_flow_runs_by_ids(flow_run_ids)
If you need to retrieve a single flow run, you can use the read_flow_run
method instead:
python
async with get_client() as client:
# For a single flow run
flow_run = await client.read_flow_run(flow_run_id="your-flow-run-id")
The returned flow runs will include all their details such as:
- State
- Parameters
- Start/end times
- Tags
- Related deployment information
You can access these attributes on the returned flow run objects:
python
for flow_run in flow_runs:
print(f"Flow Run ID: {flow_run.id}")
print(f"State: {flow_run.state.type}")
print(f"Parameters: {flow_run.parameters}")
print(f"Start Time: {flow_run.start_time}")
print("---")
Let me know if you need help accessing specific attributes of the flow runs or if you'd like to apply additional filters along with the ID filtering!