Arthur
01/03/2025, 12:58 PMasync def cancel_flow_runs(flow_runs):
async with get_client() as client:
for idx, flow_run in enumerate(flow_runs):
print(f"[{idx + 1}] Cancelling flow run '{flow_run.name}' with ID '{flow_run.id}'")
state_updates = {}
state_updates.setdefault("name", "Cancelled")
state_updates.setdefault("type", StateType.CANCELLED)
state = flow_run.state.copy(update=state_updates)
await client.set_flow_run_state(flow_run.id, state, force=True)
but it fails with TypeError: 'MockValSer' object cannot be converted to 'SchemaSerializer'
. why?Marvin
01/03/2025, 12:58 PMTypeError: 'MockValSer' object cannot be converted to 'SchemaSerializer'
, appears to be related to a serialization issue in Prefect. This type of error has been reported in various contexts, particularly when using certain task runners like DaskTaskRunner
, as seen in this GitHub issue.
Here are a few steps you might consider to resolve or work around the issue:
1. Check Prefect Version: Ensure that you are using the latest version of Prefect, as serialization issues might have been addressed in more recent updates. You can check the latest release notes for any fixes related to serialization.
2. Review Task Runner: If you are using a specific task runner, try running your code with a different runner or without a task runner to see if the issue persists. This can help isolate whether the problem is specific to a certain runner.
3. Inspect Serialization: The error suggests a problem with converting objects during serialization. Make sure that any custom objects or complex data structures in your flow are serializable. If necessary, simplify the data being passed around.
4. Consult Documentation: Review the Prefect documentation on cancellation and serializers to ensure that your implementation aligns with recommended practices.
If the problem persists, you might want to report it on the Prefect GitHub repository or seek further assistance from the Prefect community.Arthur
01/03/2025, 12:59 PMMarvin
01/03/2025, 12:59 PMset_flow_run_state
method from the Prefect client, as you have already attempted. However, there are other ways to cancel flows:
1. Command Line Interface (CLI): You can cancel a flow run using the Prefect CLI by executing the command:
bash
prefect flow-run cancel <flow-run-id>
This requires the flow run ID and can be executed from your terminal.
2. Prefect UI: You can navigate to the Prefect UI, find the flow run you wish to cancel, and use the interface to cancel it directly.
3. Python Script: If you prefer a more programmatic approach, you can use the Prefect API to interact with flow runs. This involves using the Prefect client to change the state of the flow run to "Cancelled".
For more detailed guidance, you can refer to the Prefect documentation on cancellation.Arthur
01/03/2025, 1:00 PMMarvin
01/03/2025, 1:00 PMPrefectClient
class directly to manage the flow run state. Here's a revised version of your function:
python
from prefect.client.orchestration import PrefectClient
from prefect.client.schemas import StateType
async def cancel_flow_runs(flow_runs):
async with PrefectClient() as client:
for idx, flow_run in enumerate(flow_runs):
print(f"[{idx + 1}] Cancelling flow run '{flow_run.name}' with ID '{flow_run.id}'")
state_updates = {
"name": "Cancelled",
"type": StateType.CANCELLED
}
state = flow_run.state.copy(update=state_updates)
await client.set_flow_run_state(flow_run.id, state, force=True)
Make sure that PrefectClient
and StateType
are correctly imported from the Prefect library. This code should help manage the cancellation of flow runs by setting their state to "Cancelled". If the issue persists, double-check the flow run object and its state to ensure they are compatible with the Prefect client's expectations.Arthur
01/03/2025, 1:00 PMflow_runs
beMarvin
01/03/2025, 1:00 PMflow_runs
parameter in your function should be a list of FlowRun
objects. The FlowRun
class is part of the prefect.client.schemas.objects
module. Each FlowRun
object should have properties like id
, name
, and state
, which are used in your function to update and manage the flow run's state.
Ensure that you are creating or retrieving these FlowRun
objects correctly before passing them to your cancel_flow_runs
function.Arthur
01/03/2025, 1:04 PMMarvin
01/03/2025, 1:04 PMPrefectClient
and StateType
classes, which are core parts of the Prefect library. However, if you encounter any issues, consider updating to the latest version to benefit from recent bug fixes and enhancements. You can review the full changelog here for more details on recent changes.Arthur
01/03/2025, 1:04 PMTraceback (most recent call last):
File "/Users/arthurhussey/orbital-git/orb/libs/chem/prefect_script.py", line 59, in <module>
asyncio.run(cancel_flow_runs(flows_with_pending_subflows))
File "/Users/arthurhussey/miniconda3/envs/core/lib/python3.11/asyncio/runners.py", line 190, in run
return runner.run(main)
^^^^^^^^^^^^^^^^
File "/Users/arthurhussey/miniconda3/envs/core/lib/python3.11/asyncio/runners.py", line 118, in run
return self._loop.run_until_complete(task)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/Users/arthurhussey/miniconda3/envs/core/lib/python3.11/asyncio/base_events.py", line 653, in run_until_complete
return future.result()
^^^^^^^^^^^^^^^
File "/Users/arthurhussey/orbital-git/orb/libs/chem/prefect_script.py", line 51, in cancel_flow_runs
await client.set_flow_run_state(flow_run.id, state, force=True)
File "/Users/arthurhussey/miniconda3/envs/core/lib/python3.11/site-packages/prefect/client/orchestration.py", line 2180, in set_flow_run_state
state=state_create.model_dump(mode="json", serialize_as_any=True),
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/Users/arthurhussey/miniconda3/envs/core/lib/python3.11/site-packages/pydantic/main.py", line 426, in model_dump
return self.__pydantic_serializer__.to_python(
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
TypeError: 'MockValSer' object cannot be converted to 'SchemaSerializer'
Marvin
01/03/2025, 1:04 PMArthur
01/03/2025, 1:04 PM, in set_flow_run_state
state=state_create.model_dump(mode="json", serialize_as_any=True),
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/Users/arthurhussey/miniconda3/envs/core/lib/python3.11/site-packages/pydantic/main.py", line 426, in model_dump
return self.__pydantic_serializer__.to_python(
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
TypeError: 'MockValSer' object cannot be converted to 'SchemaSerializer'
Marvin
01/03/2025, 1:05 PM'MockValSer' object cannot be converted to 'SchemaSerializer'
, is likely related to the serialization changes in Prefect 3.x, which uses Pydantic 2. Prefect 3.x introduced significant changes in how data is serialized, and existing data from Prefect 2.x might not be compatible.
Here are some steps to address this issue:
1. Ensure Compatibility: Double-check that all objects being passed to the set_flow_run_state
method are compatible with the current Prefect 3.x serialization requirements. This might involve updating how these objects are created or manipulated.
2. Update Prefect: Consider updating to the latest version of Prefect, as newer versions might have addressed serialization issues or provided better error handling.
3. Debug Serialization: If the issue persists, try to identify which specific object or state is causing the serialization failure. You might need to manually inspect or log the state before passing it to set_flow_run_state
.
4. Consult Prefect Documentation: Look for any additional guidance or updates in the Prefect documentation regarding serialization and state management.
If these steps do not resolve the issue, you may need to reach out to the Prefect community for more specific guidance, especially if this is a recurring issue with certain data structures.Arthur
01/03/2025, 1:05 PMMarvin
01/03/2025, 1:05 PMstate
that you pass to the set_flow_run_state
method should be an instance of the State
class from the prefect.client.schemas.objects
module. This class is designed to represent the state of a flow run, and you'll need to ensure that it is correctly instantiated and populated with the necessary attributes before passing it to the method.
Make sure that any updates or manipulations to the state object comply with the serialization requirements of Prefect 3.x. If you continue to face issues, it might be beneficial to create a new State
object rather than copying and updating an existing one.Arthur
01/03/2025, 1:13 PMMarvin
01/03/2025, 1:13 PMbash
pip install pydantic==2.*
This should help in aligning the serialization processes with Prefect's expectations.Arthur
01/03/2025, 1:24 PMprint(type(state))
print(state.to_state_create().model_dump(mode="json", serialize_as_any=True))
the ouptout is
<class 'prefect.client.schemas.objects.State'>
and then the failure message. Why is this wrong?Marvin
01/03/2025, 1:24 PMState
object you are working with should be converted to a StateCreate
object before performing operations like model_dump
. The StateCreate
class is part of the prefect.client.schemas.actions
module and is designed for creating new state representations.
Here's a potential adjustment you can make:
1. Ensure that you have a StateCreate
instance. If you are converting a State
object, make sure you are using the correct method to obtain a StateCreate
instance.
2. Use the StateCreate
class directly, if needed, to ensure proper serialization.
If the to_state_create()
method is supposed to convert your State
object to a StateCreate
object but fails, verify that the conversion logic is correct and aligns with Prefect's expectations for state transitions.Arthur
01/03/2025, 1:28 PMMarvin
01/03/2025, 1:28 PMArthur
01/03/2025, 1:28 PMMarvin
01/03/2025, 1:28 PMArthur
01/03/2025, 1:29 PMMarvin
01/03/2025, 1:29 PMset_flow_run_state
you've provided is designed to set the state of a flow run using a State
object. The key part of this function is the conversion of the State
object to a StateCreate
object using state.to_state_create()
. This conversion is necessary because the API expects a StateCreate
representation when setting the state.
If you are encountering an error during this conversion, it might be due to:
1. Incorrect State Object: Ensure that the State
object you are using is correctly initialized and contains all necessary information before conversion.
2. Pydantic Serialization: The error might stem from Pydantic's serialization process. Make sure your environment is using Pydantic 2.x, as required by Prefect 3.x.
3. State Details: Check that the state_details
attribute within your State
object is properly populated, especially flow_run_id
and transition_id
.
Given the function's reliance on state.to_state_create()
, double-check that this method is implemented correctly in your State
class, and ensure that all necessary fields are being set before calling model_dump
.
If the problem persists, it may be beneficial to debug by logging the state object before and after conversion to identify any discrepancies in expected attributes or data types.