Oz Shaked
11/29/2024, 11:31 AMtimedelta
objects serialization in tasks and flows inputs?
I keep getting "409" error when I'm trying to submit or run deployment of a flow with timedelta
object in the input:
The error
prefect.exceptions.PrefectHTTPStatusError: Client error '409 Conflict' for url '<http://localhost:4200/api/deployments/86fb720a-5462-4796-a0fe-c1d849e7f28f/create_flow_run>'
Response: {'detail': "Error creating flow run: Validation failed for field 'flow_input.forward_buffer'. Failure reason: 60.0 is not valid under any of the given schemas"}
For more information check: <https://developer.mozilla.org/en-US/docs/Web/HTTP/Status/409>
The input to the flow
class IdentifySegmentsFlowInput(PrefectFlowInputBase):
channel_id: str # The ID of the live stream the flow should process.
recording_url: Optional[str] = None # the recording of this stream
# parameters
top_segment_n: Optional[int] = 10 # The number of segments to identify.
backward_buffer: Optional[timedelta] = timedelta(minutes=1)
forward_buffer: Optional[timedelta] = timedelta(minutes=1)
@field_validator("top_segment_n", "backward_buffer", "forward_buffer", mode="after")
@classmethod
def set_defaults(cls, v, info: ValidationInfo):
if v is None:
# Get the default value from the field definition
field_name = info.field_name
default_value = cls.model_fields[field_name].get_default()
return default_value
return v
Flow singnature:
@prefect.flow(flow_run_name="Identify Segments {flow_input.channel_id}")
async def identify_interesting_segments_flow(
flow_input: IdentifySegmentsFlowInput,
):
Oz Shaked
11/29/2024, 11:42 AM@field_serializer("backward_buffer", "forward_buffer")
def serialize_timedelta(self, value: Optional[timedelta]) -> Optional[str]:
if value is not None:
return duration_isoformat(value) # Convert timedelta to ISO 8601
return value