<@ULVA73B9P> Hello, I'm encountering an issue with...
# ask-marvin
t
@Marvin Hello, I'm encountering an issue with the Prefect client.
Copy code
flow_run = await client.create_flow_run_from_deployment(
                deployment.id,
                parameters=parameters or {}
            )
Copy code
File "/service/prefect_service.py", line 30, in run_deployment
    flow_run = await client.create_flow_run_from_deployment(
               ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/.venv/lib/python3.11/site-packages/prefect/client/orchestration/_deployments/client.py", line 1328, in create_flow_run_from_deployment
    state=to_state_create(state),
          ^^^^^^^^^^^^^^^^^^^^^^
  File "/.venv/lib/python3.11/site-packages/prefect/states.py", line 65, in to_state_create
    return StateCreate(
           ^^^^^^^^^^^^
  File "/.venv/lib/python3.11/site-packages/pydantic/main.py", line 253, in __init__
    validated_self = self.__pydantic_validator__.validate_python(data, self_instance=self)
                     ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/.venv/lib/python3.11/site-packages/pydantic/_internal/_mock_val_ser.py", line 100, in __getattr__
    raise PydanticUserError(self._error_message, code=self._code)
PydanticUserError: `StateCreate` is not fully defined; you should define all referenced types, then call `StateCreate.model_rebuild()`.
m
thought for 43.1 seconds
Based on the GitHub issues and research, I can help you resolve this error. This is a known issue related to import ordering in Prefect 3.x. There are two ways to fix this: 1. Recommended Solution: Import from the main
prefect
module first:
Copy code
python
from prefect import flow  # or any other prefect import
from prefect.client import get_client  # your other imports
2. Alternative Solution: If you're creating a state directly, use a simpler state creation approach:
Copy code
python
from prefect.states import Scheduled
# Create scheduled state without complex data structures
state = Scheduled(scheduled_time=datetime.now(timezone.utc))
The error occurs because of Pydantic's validation system and some circular import dependencies in Prefect. The error message
StateCreate is not fully defined
happens when the Pydantic models haven't been fully initialized due to the order of imports. This issue has been documented in Issue #15957 and Issue #15646. To fix your specific code, modify it to ensure you have a top-level import from the main
prefect
module before using the client:
Copy code
python
from prefect import flow  # Add this import first
from prefect.client import get_client

# Your existing code
flow_run = await client.create_flow_run_from_deployment(
    deployment.id,
    parameters=parameters or {}
)
If you're still experiencing issues after trying these solutions, please let me know and share: 1. Your complete import statements 2. The Prefect version you're using (
prefect version
)
t
@Marvin Thank you. How can I use read_flow_runs to filter by deployment and parameters?
m
thought for 68.1 seconds
Based on the research and available filters, here's how you can use
read_flow_runs
to filter by deployment and parameters in Prefect 3.x:
Copy code
python
from prefect.client import get_client
from prefect.client.schemas.filters import FlowRunFilter, FlowRunFilterDeploymentId

async with get_client() as client:
    # Filter by deployment ID
    flow_runs = await client.read_flow_runs(
        flow_run_filter=FlowRunFilter(
            deployment_id=FlowRunFilterDeploymentId(any_=["your-deployment-id"])
        )
    )
For parameters, you'll need to filter the results programmatically as parameters are stored as part of the flow run data. Here's a complete example:
Copy code
python
from prefect.client import get_client
from prefect.client.schemas.filters import FlowRunFilter, FlowRunFilterDeploymentId

async def get_flow_runs_by_deployment_and_params(
    deployment_id: str,
    parameter_filters: dict
):
    async with get_client() as client:
        # First filter by deployment
        flow_runs = await client.read_flow_runs(
            flow_run_filter=FlowRunFilter(
                deployment_id=FlowRunFilterDeploymentId(any_=[deployment_id])
            )
        )
        
        # Then filter by parameters
        filtered_runs = [
            run for run in flow_runs 
            if all(
                run.parameters.get(key) == value 
                for key, value in parameter_filters.items()
            )
        ]
        
        return filtered_runs

# Usage example:
deployment_id = "your-deployment-id"
params_to_filter = {"param1": "value1", "param2": "value2"}
matching_runs = await get_flow_runs_by_deployment_and_params(
    deployment_id,
    params_to_filter
)
Additional filtering options available include: -
FlowRunFilterState
- Filter by state type or name -
FlowRunFilterStartTime
- Filter by when the flow run started -
FlowRunFilterTags
- Filter by tags -
FlowRunFilterName
- Filter by flow run name For example, to add state filtering:
Copy code
python
from prefect.client.schemas.filters import FlowRunFilterState

flow_runs = await client.read_flow_runs(
    flow_run_filter=FlowRunFilter(
        deployment_id=FlowRunFilterDeploymentId(any_=[deployment_id]),
        state=FlowRunFilterState(
            type={"any_": ["COMPLETED"]}  # or "FAILED", "PENDING", etc.
        )
    )
)
Note that the filters are combined with AND logic, so all conditions must be met for a flow run to be included in the results. Let me know if you need help with any specific filtering scenarios or if you'd like to see examples of other filter combinations!
t
@MarvinI have listed all the flow run IDs. How can I retrieve them?
m
thought for 21.1 seconds
You can retrieve specific flow runs by their IDs using the
FlowRunFilterId
filter. Here's how to do it:
Copy code
python
from prefect.client import get_client
from prefect.client.schemas.filters import FlowRunFilter, FlowRunFilterId

async def get_flow_runs_by_ids(flow_run_ids: list[str]):
    async with get_client() as client:
        flow_runs = await client.read_flow_runs(
            flow_run_filter=FlowRunFilter(
                id=FlowRunFilterId(any_=flow_run_ids)
            )
        )
        return flow_runs

# Usage example:
flow_run_ids = ["id1", "id2", "id3"]  # Your list of flow run IDs
flow_runs = await get_flow_runs_by_ids(flow_run_ids)
If you need to retrieve a single flow run, you can use the
read_flow_run
method instead:
Copy code
python
async with get_client() as client:
    # For a single flow run
    flow_run = await client.read_flow_run(flow_run_id="your-flow-run-id")
The returned flow runs will include all their details such as: - State - Parameters - Start/end times - Tags - Related deployment information You can access these attributes on the returned flow run objects:
Copy code
python
for flow_run in flow_runs:
    print(f"Flow Run ID: {flow_run.id}")
    print(f"State: {flow_run.state.type}")
    print(f"Parameters: {flow_run.parameters}")
    print(f"Start Time: {flow_run.start_time}")
    print("---")
Let me know if you need help accessing specific attributes of the flow runs or if you'd like to apply additional filters along with the ID filtering!