<@ULVA73B9P> My workflow worked fine the the past....
# ask-marvin
s
@Marvin My workflow worked fine the the past. Now I am getting pydantic errors and I don't know why. What do I need to do to resolved this:
Copy code
Created subflow run 'turquoise-gorilla' for flow 'download-data'
09:28:50 AM
Info
Encountered exception during execution: 1 validation error for Transaction
key
  Input should be a valid string [type=string_type, input_value=1732210130576103063, input_type=int]
    For further information visit <https://errors.pydantic.dev/2.9/v/string_type>

Traceback (most recent call last):
  File "/home/echodyne.int/skroeger/devel/prefect/.venv/lib/python3.10/site-packages/prefect/flow_engine.py", line 655, in run_context
    yield self
  File "/home/echodyne.int/skroeger/devel/prefect/.venv/lib/python3.10/site-packages/prefect/flow_engine.py", line 699, in run_flow_sync
    engine.call_flow_fn()
  File "/home/echodyne.int/skroeger/devel/prefect/.venv/lib/python3.10/site-packages/prefect/flow_engine.py", line 678, in call_flow_fn
    result = call_with_parameters(self.flow.fn, self.parameters)
  File "/home/echodyne.int/skroeger/devel/prefect/.venv/lib/python3.10/site-packages/prefect/utilities/callables.py", line 206, in call_with_parameters
    return fn(*args, **kwargs)
  File "/home/echodyne.int/skroeger/devel/prefect/echoshield_analysis_flow.py", line 49, in echoshield_analysis_flow
    db_path = download_data(
  File "/home/echodyne.int/skroeger/devel/prefect/.venv/lib/python3.10/site-packages/prefect/flows.py", line 1355, in __call__
    return run_flow(
  File "/home/echodyne.int/skroeger/devel/prefect/.venv/lib/python3.10/site-packages/prefect/flow_engine.py", line 821, in run_flow
    return run_flow_sync(**kwargs)
  File "/home/echodyne.int/skroeger/devel/prefect/.venv/lib/python3.10/site-packages/prefect/flow_engine.py", line 701, in run_flow_sync
    return engine.state if return_type == "state" else engine.result()
  File "/home/echodyne.int/skroeger/devel/prefect/.venv/lib/python3.10/site-packages/prefect/flow_engine.py", line 255, in result
    raise self._raised
  File "/home/echodyne.int/skroeger/devel/prefect/.venv/lib/python3.10/site-packages/prefect/flow_engine.py", line 655, in run_context
    yield self
  File "/home/echodyne.int/skroeger/devel/prefect/.venv/lib/python3.10/site-packages/prefect/flow_engine.py", line 699, in run_flow_sync
    engine.call_flow_fn()
  File "/home/echodyne.int/skroeger/devel/prefect/.venv/lib/python3.10/site-packages/prefect/flow_engine.py", line 678, in call_flow_fn
    result = call_with_parameters(self.flow.fn, self.parameters)
  File "/home/echodyne.int/skroeger/devel/prefect/.venv/lib/python3.10/site-packages/prefect/utilities/callables.py", line 206, in call_with_parameters
    return fn(*args, **kwargs)
  File "/home/echodyne.int/skroeger/devel/prefect/analysis_tasks.py", line 392, in download_data
    beams_df = get_beams(radar_runs, data_config['beams'], jds)
  File "/home/echodyne.int/skroeger/devel/prefect/.venv/lib/python3.10/site-packages/prefect/tasks.py", line 1002, in __call__
    return run_task(
  File "/home/echodyne.int/skroeger/devel/prefect/.venv/lib/python3.10/site-packages/prefect/task_engine.py", line 1526, in run_task
    return run_task_sync(**kwargs)
  File "/home/echodyne.int/skroeger/devel/prefect/.venv/lib/python3.10/site-packages/prefect/task_engine.py", line 1339, in run_task_sync
    return engine.state if return_type == "state" else engine.result()
  File "/home/echodyne.int/skroeger/devel/prefect/.venv/lib/python3.10/site-packages/prefect/task_engine.py", line 482, in result
    raise self._raised
  File "/home/echodyne.int/skroeger/devel/prefect/.venv/lib/python3.10/site-packages/prefect/task_engine.py", line 781, in run_context
    yield self
  File "/home/echodyne.int/skroeger/devel/prefect/.venv/lib/python3.10/site-packages/prefect/task_engine.py", line 1336, in run_task_sync
    with engine.run_context(), engine.transaction_context() as txn:
  File "/usr/lib/python3.10/contextlib.py", line 135, in __enter__
    return next(self.gen)
  File "/home/echodyne.int/skroeger/devel/prefect/.venv/lib/python3.10/site-packages/prefect/task_engine.py", line 756, in transaction_context
    with transaction(
  File "/usr/lib/python3.10/contextlib.py", line 135, in __enter__
    return next(self.gen)
  File "/home/echodyne.int/skroeger/devel/prefect/.venv/lib/python3.10/site-packages/prefect/transactions.py", line 472, in transaction
    with Transaction(
  File "/home/echodyne.int/skroeger/devel/prefect/.venv/lib/python3.10/site-packages/pydantic/main.py", line 212, in __init__
    validated_self = self.__pydantic_validator__.validate_python(data, self_instance=self)
pydantic_core._pydantic_core.ValidationError: 1 validation error for Transaction
key
  Input should be a valid string [type=string_type, input_value=1732210130576103063, input_type=int]
    For further information visit <https://errors.pydantic.dev/2.9/v/string_type>
09:28:50 AM
Error
Finished in state Failed('Flow run encountered an exception: ValidationError: 1 validation error for Transaction\nkey\n  Input should be a valid string [type=string_type, input_value=1732210130576103063, input_type=int]\n    For further information visit <https://errors.pydantic.dev/2.9/v/string_type>'
)
09:28:51 AM
Error
Process for flow run 'enlightened-agouti' exited with status code: 1
09
m
Your message was too long by 1264 tokens - please shorten it and try again. For reference, here's your message at the allowed limit:
My workflow worked fine the the past. Now I am getting pydantic errors and I don't know why. What do I need to do to resolved this:
```Created subflow run 'turquoise-gorilla' for flow 'download-data' 092850 AM Info Encountered exception during execution: 1 validation error for Transaction key Input should be a valid string [type=string_type, input_value=1732210130576103063, input_type=int] For further information visit https://errors.pydantic.dev/2.9/v/string_type Traceback (most recent call last): File "/home/echodyne.int/skroeger/devel/prefect/.venv/lib/python3.10/site-packages/prefect/flow_engine.py", line 655, in run_context yield self File "/home/echodyne.int/skroeger/devel/prefect/.venv/lib/python3.10/site-packages/prefect/flow_engine.py", line 699, in run_flow_sync engine.call_flow_fn() File "/home/echodyne.int/skroeger/devel/prefect/.venv/lib/python3.10/site-packages/prefect/flow_engine.py", line 678, in call_flow_fn result = call_with_parameters(self.flow.fn, self.parameters) File "/home/echodyne.int/skroeger
n
this is the important part
Copy code
Finished in state Failed('Flow run encountered an exception: ValidationError: 1 validation error for Transaction\nkey\n  Input should be a valid string [type=string_type, input_value=1732210130576103063, input_type=int]
can you show your flow definition? the flow that this failure was raised from
download-data
it seems like
s
Copy code
@flow
def download_data(
    runs: pd.DataFrame,
    data_dir: Path,
    data_config: dict[str, list[str]],
    fovs: dict[int, dict[str, Any]],
    label_transforms: dict[str, str],
    jds: JDSInterface,
    refresh: bool = True
) -> dict[str, Path]:


    radar_runs = [run for run in runs.radar_run_id.unique()]
    radar_targets = [
        int(radar_target_id) for radar_target_id 
        in runs.radar_target_id.unique()
        if not np.isnan(radar_target_id)
        ] or [0]
    
    logger = get_run_logger()

    if refresh:
        beams_df = get_beams(radar_runs, data_config['beams'], jds)
        dets_df = get_dets(radar_runs, data_config['dets'], jds, label_transforms)
        meas_df = get_meas(radar_runs, data_config['meas'], jds, label_transforms)
        tracks_df = get_tracks(radar_runs, data_config['tracks'], jds, label_transforms)
        logger.warning(tracks_df.label.value_counts())    
        targets_df = get_targets(radar_targets, data_config['targets'], jds, label_transforms)
        targets_df = targets_df.merge(runs[['radar_target_id', 'target_name']].drop_duplicates(), validate="m:1")
        targets_df = set_target_styles(targets_df, fovs)
        assoc_cookies = get_assoc_cookies_task(radar_runs, data_config['cookies'], jds)
    else:
        beams_df = pd.DataFrame(columns=data_config['beams'])
        dets_df = pd.DataFrame(columns=data_config['dets'])
        meas_df = pd.DataFrame(columns=data_config['meas'])
        tracks_df = pd.DataFrame(columns=data_config['tracks'])
        targets_df = pd.DataFrame(columns=data_config['targets'])
        assoc_cookies = pd.DataFrame(columns=data_config['cookies'])

    db_path = populate_db(data_dir, beams_df, dets_df, meas_df, tracks_df, targets_df, assoc_cookies, refresh=refresh)
    # db_path = populate_db(data_dir, tracks_df, targets_df)
    # db_path = populate_db(data_dir, beams_df, meas_df, tracks_df, targets_df, assoc_cookies, refresh=refresh)
    return db_path
Oh so it's probably something with the cache expiration.
Changing the cache expiration function fixed my issue I think.
n
good to hear! feels like there's still something here though, like perhaps we need a
str(maybe_int)
somewhere thanks for the context!
s
I was using:
Copy code
def force_refresh(*args, **kwargs):
    return time_ns()
I changed it to:
Copy code
def force_refresh(*args, **kwargs):
    return datetime.now().isoformat()
👍 1