Scott Zelenka
03/17/2020, 9:17 PMMarch 17th 2020 at 5:13:44pm | prefect.CloudTaskRunner
INFO
Task 'sfdc_download_data': Starting task run...March 17th 2020 at 5:13:44pm | prefect.CloudTaskRunner
ERROR
Unexpected error: KeyError('text')
Traceback (most recent call last):
File "/usr/local/lib/python3.7/site-packages/prefect/engine/runner.py", line 48, in inner
new_state = method(self, state, *args, **kwargs)
File "/usr/local/lib/python3.7/site-packages/prefect/engine/cloud/task_runner.py", line 225, in check_task_is_cached
for key, res in (candidate_state.cached_inputs or {}).items()
File "/usr/local/lib/python3.7/site-packages/prefect/engine/cloud/task_runner.py", line 225, in <dictcomp>
for key, res in (candidate_state.cached_inputs or {}).items()
KeyError: 'text'
@task(
max_retries=3,
retry_delay=datetime.timedelta(minutes=5),
cache_for=datetime.timedelta(days=1),
cache_validator=cache_validators.duration_only
)
def sfdc_download_data(
url: typing.Union[str, Parameter]
) -> typing.Union[str, Result]:
...
text
parameter in this Task ... where is the task_runner getting this cached_input from?josh
03/17/2020, 9:48 PMtext
in relation to this task? To me it looks like the cache duration has not passed so the cached inputs are retrieved from cloud but then the inputs to your task runner no longer have that text attributeScott Zelenka
03/17/2020, 10:06 PMtext
and url
parameter.
My assumption was that if it detected a change in the task inputs, it would invalidate the cache and recompute it. Or in this case, if it failed to load the previous cache, it would invalidate and retry.
How does one force an invalidation of the cache without waiting for the original duration setting?josh
03/17/2020, 11:34 PM